You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/05 18:07:47 UTC

[8/8] incubator-eagle git commit: EAGLE-276 eagle support for mr & spark history job monitoring mr & spark job history monitoring

EAGLE-276 eagle support for mr & spark history job monitoring
mr & spark job history monitoring

Author: @wujinhu <ji...@ebay.com>
Reviewer: @yonzhang <yo...@apache.org>

Closes: #217


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/fe509125
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/fe509125
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/fe509125

Branch: refs/heads/develop
Commit: fe50912574dfe2126da0154b8544d81022126acc
Parents: f857cbe
Author: yonzhang <yo...@gmail.com>
Authored: Tue Jul 5 11:10:45 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Tue Jul 5 11:10:45 2016 -0700

----------------------------------------------------------------------
 .../apache/eagle/common/SerializableUtils.java  | 126 ++++
 .../entity/meta/DefaultJavaObjctSerDeser.java   |  41 ++
 .../entity/meta/EntityDefinitionManager.java    |  21 +-
 eagle-jpm/eagle-jpa-spark-history/pom.xml       |  66 ++
 eagle-jpm/eagle-jpa-spark-running/pom.xml       |  66 ++
 eagle-jpm/eagle-jpm-entity/pom.xml              |  52 ++
 .../eagle/jpm/entity/JPMEntityRepository.java   |  30 +
 .../org/apache/eagle/jpm/entity/JobConfig.java  |  38 +
 .../org/apache/eagle/jpm/entity/SparkApp.java   | 428 ++++++++++++
 .../apache/eagle/jpm/entity/SparkExecutor.java  | 233 +++++++
 .../org/apache/eagle/jpm/entity/SparkJob.java   | 178 +++++
 .../org/apache/eagle/jpm/entity/SparkStage.java | 299 ++++++++
 .../org/apache/eagle/jpm/entity/SparkTask.java  | 290 ++++++++
 eagle-jpm/eagle-jpm-mr-history/pom.xml          | 138 ++++
 .../assembly/eagle-jpm-mr-history-assembly.xml  |  65 ++
 .../eagle/jpm/mr/history/MRHistoryJobMain.java  |  87 +++
 .../jpm/mr/history/common/JHFConfigManager.java | 182 +++++
 .../jpm/mr/history/common/JPAConstants.java     |  95 +++
 .../eagle/jpm/mr/history/common/JobConfig.java  |  38 +
 .../history/crawler/AbstractJobHistoryDAO.java  | 194 +++++
 .../crawler/DefaultJHFInputStreamCallback.java  |  66 ++
 .../mr/history/crawler/JHFCrawlerDriver.java    |  27 +
 .../history/crawler/JHFCrawlerDriverImpl.java   | 277 ++++++++
 .../history/crawler/JHFInputStreamCallback.java |  37 +
 .../crawler/JobHistoryContentFilter.java        |  36 +
 .../crawler/JobHistoryContentFilterBuilder.java |  91 +++
 .../crawler/JobHistoryContentFilterImpl.java    |  94 +++
 .../mr/history/crawler/JobHistoryDAOImpl.java   | 203 ++++++
 .../jpm/mr/history/crawler/JobHistoryLCM.java   |  86 +++
 .../JobHistorySpoutCollectorInterceptor.java    |  36 +
 .../history/entities/JPAEntityRepository.java   |  40 ++
 .../mr/history/entities/JobBaseAPIEntity.java   |  24 +
 .../mr/history/entities/JobConfigSerDeser.java  |  63 ++
 .../entities/JobConfigurationAPIEntity.java     |  67 ++
 .../history/entities/JobCountersSerDeser.java   | 166 +++++
 .../mr/history/entities/JobEventAPIEntity.java  |  44 ++
 .../history/entities/JobExecutionAPIEntity.java | 132 ++++
 .../entities/JobProcessTimeStampEntity.java     |  44 ++
 .../entities/TaskAttemptCounterAPIEntity.java   |  61 ++
 .../entities/TaskAttemptExecutionAPIEntity.java | 101 +++
 .../entities/TaskExecutionAPIEntity.java        |  89 +++
 .../entities/TaskFailureCountAPIEntity.java     |  67 ++
 .../jobcounter/CounterGroupDictionary.java      | 238 +++++++
 .../mr/history/jobcounter/CounterGroupKey.java  |  32 +
 .../jpm/mr/history/jobcounter/CounterKey.java   |  30 +
 .../history/jobcounter/JobCounterException.java |  63 ++
 .../jpm/mr/history/jobcounter/JobCounters.java  |  47 ++
 .../jpm/mr/history/parser/EagleJobStatus.java   |  28 +
 .../jpm/mr/history/parser/EagleJobTagName.java  |  48 ++
 .../jpm/mr/history/parser/EagleTaskStatus.java  |  25 +
 .../HistoryJobEntityCreationListener.java       |  39 ++
 .../HistoryJobEntityLifecycleListener.java      |  34 +
 .../jpm/mr/history/parser/ImportException.java  |  33 +
 .../mr/history/parser/JHFEventReaderBase.java   | 405 +++++++++++
 .../eagle/jpm/mr/history/parser/JHFFormat.java  |  24 +
 .../mr/history/parser/JHFMRVer1EventReader.java | 150 ++++
 .../jpm/mr/history/parser/JHFMRVer1Parser.java  | 271 +++++++
 .../parser/JHFMRVer1PerLineListener.java        |  39 ++
 .../mr/history/parser/JHFMRVer2EventReader.java | 380 ++++++++++
 .../jpm/mr/history/parser/JHFMRVer2Parser.java  |  87 +++
 .../jpm/mr/history/parser/JHFParserBase.java    |  35 +
 .../jpm/mr/history/parser/JHFParserFactory.java |  71 ++
 .../parser/JHFWriteNotCompletedException.java   |  36 +
 ...JobConfigurationCreationServiceListener.java |  92 +++
 .../JobEntityCreationEagleServiceListener.java  | 127 ++++
 .../parser/JobEntityCreationPublisher.java      |  47 ++
 .../parser/JobEntityLifecycleAggregator.java    | 176 +++++
 .../mr/history/parser/MRErrorClassifier.java    | 112 +++
 .../jpm/mr/history/parser/RecordTypes.java      |  26 +
 .../parser/TaskAttemptCounterListener.java      | 152 ++++
 .../mr/history/parser/TaskFailureListener.java  | 137 ++++
 .../history/storm/DefaultJobIdPartitioner.java  |  28 +
 .../history/storm/HistoryJobProgressBolt.java   | 132 ++++
 .../jpm/mr/history/storm/JobHistorySpout.java   | 208 ++++++
 .../eagle/jpm/mr/history/storm/JobIdFilter.java |  23 +
 .../history/storm/JobIdFilterByPartition.java   |  40 ++
 .../jpm/mr/history/storm/JobIdPartitioner.java  |  23 +
 .../mr/history/zkres/JobHistoryZKStateLCM.java  |  31 +
 .../history/zkres/JobHistoryZKStateManager.java | 305 ++++++++
 .../src/main/resources/JobCounter.conf          | 185 +++++
 .../services/org.apache.hadoop.fs.FileSystem    |  20 +
 .../src/main/resources/MRErrorCategory.config   |  41 ++
 .../src/main/resources/application.conf         |  85 +++
 .../src/main/resources/core-site.xml            | 497 +++++++++++++
 .../src/main/resources/hdfs-site.xml            | 449 ++++++++++++
 .../src/main/resources/log4j.properties         |  34 +
 eagle-jpm/eagle-jpm-spark-history/pom.xml       | 122 ++++
 .../eagle-jpm-spark-history-assembly.xml        |  65 ++
 .../history/config/SparkHistoryCrawlConfig.java | 122 ++++
 .../jpm/spark/history/crawl/EventType.java      |  24 +
 .../history/crawl/JHFInputStreamReader.java     |  25 +
 .../jpm/spark/history/crawl/JHFParserBase.java  |  29 +
 .../history/crawl/JHFSparkEventReader.java      | 699 +++++++++++++++++++
 .../jpm/spark/history/crawl/JHFSparkParser.java |  63 ++
 .../history/crawl/SparkApplicationInfo.java     |  69 ++
 .../SparkHistoryFileInputStreamReaderImpl.java  |  53 ++
 .../status/JobHistoryZKStateManager.java        | 262 +++++++
 .../spark/history/status/ZKStateConstant.java   |  27 +
 .../history/storm/FinishedSparkJobSpout.java    | 152 ++++
 .../history/storm/SparkHistoryTopology.java     |  81 +++
 .../spark/history/storm/SparkJobParseBolt.java  | 178 +++++
 .../eagle/jpm/spark/history/storm/TestHDFS.java |  47 ++
 .../services/org.apache.hadoop.fs.FileSystem    |  20 +
 .../src/main/resources/application.conf         |  77 ++
 .../src/main/resources/log4j.properties         |  35 +
 eagle-jpm/eagle-jpm-spark-running/pom.xml       |  66 ++
 eagle-jpm/eagle-jpm-util/pom.xml                |  65 ++
 .../org/apache/eagle/jpm/util/Constants.java    |  49 ++
 .../org/apache/eagle/jpm/util/HDFSUtil.java     |  44 ++
 .../org/apache/eagle/jpm/util/JSONUtil.java     |  66 ++
 .../eagle/jpm/util/JobNameNormalization.java    | 118 ++++
 .../eagle/jpm/util/SparkEntityConstant.java     |  29 +
 .../apache/eagle/jpm/util/SparkJobTagName.java  |  44 ++
 .../util/resourceFetch/RMResourceFetcher.java   |  98 +++
 .../jpm/util/resourceFetch/ResourceFetcher.java |  27 +
 .../SparkHistoryServerResourceFetcher.java      |  81 +++
 .../connection/InputStreamUtils.java            |  69 ++
 .../util/resourceFetch/connection/JobUtils.java |  43 ++
 .../connection/URLConnectionUtils.java          | 102 +++
 .../util/resourceFetch/ha/HAURLSelector.java    |  28 +
 .../resourceFetch/ha/HAURLSelectorImpl.java     | 101 +++
 .../jpm/util/resourceFetch/model/AppInfo.java   | 146 ++++
 .../util/resourceFetch/model/Applications.java  |  38 +
 .../util/resourceFetch/model/AppsWrapper.java   |  36 +
 .../resourceFetch/model/SparkApplication.java   |  57 ++
 .../model/SparkApplicationAttempt.java          |  73 ++
 .../model/SparkApplicationWrapper.java          |  38 +
 .../url/JobListServiceURLBuilderImpl.java       |  37 +
 .../resourceFetch/url/ServiceURLBuilder.java    |  21 +
 .../SparkCompleteJobServiceURLBuilderImpl.java  |  29 +
 .../url/SparkJobServiceURLBuilderImpl.java      |  29 +
 .../src/main/resources/application.properties   |  23 +
 eagle-jpm/pom.xml                               |  54 ++
 133 files changed, 13354 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java
new file mode 100644
index 0000000..c5823ea
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/SerializableUtils.java
@@ -0,0 +1,126 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.common;
+
+import org.xerial.snappy.SnappyInputStream;
+import org.xerial.snappy.SnappyOutputStream;
+
+import java.io.*;
+
+/**
+ * Utilities for working with Serializables.
+ *
+ * Derived from "com.google.cloud.dataflow.sdk.util.SerializableUtils":
+ * https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java
+ */
+public class SerializableUtils {
+  /**
+   * Serializes the argument into an array of bytes, and returns it.
+   *
+   * @throws IllegalArgumentException if there are errors when serializing
+   */
+  public static byte[] serializeToCompressedByteArray(Object value) {
+    try {
+      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+      try (ObjectOutputStream oos = new ObjectOutputStream(new SnappyOutputStream(buffer))) {
+        oos.writeObject(value);
+      }
+      return buffer.toByteArray();
+    } catch (IOException exn) {
+      throw new IllegalArgumentException(
+          "unable to serialize " + value,
+          exn);
+    }
+  }
+
+  /**
+   * Serializes the argument into an array of bytes, and returns it.
+   *
+   * @throws IllegalArgumentException if there are errors when serializing
+   */
+  public static byte[] serializeToByteArray(Object value) {
+    try {
+      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+      try (ObjectOutputStream oos = new ObjectOutputStream(buffer)) {
+        oos.writeObject(value);
+      }
+      return buffer.toByteArray();
+    } catch (IOException exn) {
+      throw new IllegalArgumentException("unable to serialize " + value, exn);
+    }
+  }
+
+  /**
+   * Deserializes an object from the given array of bytes, e.g., as
+   * serialized using {@link #serializeToCompressedByteArray}, and returns it.
+   *
+   * @throws IllegalArgumentException if there are errors when
+   * deserializing, using the provided description to identify what
+   * was being deserialized
+   */
+  public static Object deserializeFromByteArray(byte[] encodedValue,
+                                                          String description) {
+    try {
+      try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(encodedValue))) {
+        return ois.readObject();
+      }
+    } catch (IOException | ClassNotFoundException exn) {
+      throw new IllegalArgumentException(
+          "unable to deserialize " + description,
+          exn);
+    }
+  }
+
+  /**
+   * Deserializes an object from the given array of bytes, e.g., as
+   * serialized using {@link #serializeToCompressedByteArray}, and returns it.
+   *
+   * @throws IllegalArgumentException if there are errors when
+   * deserializing, using the provided description to identify what
+   * was being deserialized
+   */
+  public static Object deserializeFromCompressedByteArray(byte[] encodedValue,
+                                                          String description) {
+    try {
+      try (ObjectInputStream ois = new ObjectInputStream(
+          new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) {
+        return ois.readObject();
+      }
+    } catch (IOException | ClassNotFoundException exn) {
+      throw new IllegalArgumentException(
+          "unable to deserialize " + description,
+          exn);
+    }
+  }
+
+  public static <T extends Serializable> T ensureSerializable(T value) {
+    @SuppressWarnings("unchecked")
+    T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
+        value.toString());
+    return copy;
+  }
+
+  public static <T extends Serializable> T clone(T value) {
+    @SuppressWarnings("unchecked")
+    T copy = (T) deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
+        value.toString());
+    return copy;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java
new file mode 100644
index 0000000..24385a9
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DefaultJavaObjctSerDeser.java
@@ -0,0 +1,41 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.log.entity.meta;
+
+import org.apache.eagle.common.SerializableUtils;
+
+public class DefaultJavaObjctSerDeser implements EntitySerDeser<Object> {
+    public final static  EntitySerDeser<Object> INSTANCE = new DefaultJavaObjctSerDeser();
+
+    @Override
+    public Object deserialize(byte[] bytes) {
+        return SerializableUtils.deserializeFromByteArray(bytes,"Deserialize from java object bytes");
+    }
+
+    @Override
+    public byte[] serialize(Object o) {
+        return SerializableUtils.serializeToByteArray(o);
+    }
+
+    @Override
+    public Class<Object> type() {
+        return Object.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
index d990fb5..7b1010d 100755
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityDefinitionManager.java
@@ -16,12 +16,6 @@
  */
 package org.apache.eagle.log.entity.meta;
 
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.eagle.common.config.EagleConfigFactory;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.repo.EntityRepositoryScanner;
@@ -31,6 +25,12 @@ import org.mockito.cglib.core.Predicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * static initialization of all registered entities. As of now, dynamic registration is not supported
  */
@@ -365,11 +365,12 @@ public class EntityDefinitionManager {
 			q.setQualifierName(column.value());
 			EntitySerDeser<?> serDeser = _serDeserMap.get(fldCls); 
 			if(serDeser == null){
-				throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() + 
-						" of entity " + cls.getSimpleName() + " has no serializer associated ");
-			} else {
-				q.setSerDeser((EntitySerDeser<Object>)serDeser);
+//				throw new IllegalArgumentException(fldCls.getName() + " in field " + f.getName() +
+//						" of entity " + cls.getSimpleName() + " has no serializer associated ");
+				serDeser = DefaultJavaObjctSerDeser.INSTANCE;
 			}
+
+			q.setSerDeser((EntitySerDeser<Object>)serDeser);
 			ed.getQualifierNameMap().put(q.getQualifierName(), q);
 			ed.getDisplayNameMap().put(q.getDisplayName(), q);
 			// TODO: should refine rules, consider fields like "hCol", getter method should be gethCol() according to org.apache.commons.beanutils.PropertyUtils

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpa-spark-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpa-spark-history/pom.xml b/eagle-jpm/eagle-jpa-spark-history/pom.xml
new file mode 100644
index 0000000..cc293b6
--- /dev/null
+++ b/eagle-jpm/eagle-jpa-spark-history/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.eagle</groupId>
+    <artifactId>eagle-jpm-parent</artifactId>
+    <version>0.3.0-incubating</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>eagle-jpm-spark-history</artifactId>
+  <name>eagle-jpm-spark-history</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+  	<dependency>
+  		<groupId>org.slf4j</groupId>
+  		<artifactId>slf4j-api</artifactId>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.eagle</groupId>
+  		<artifactId>eagle-stream-process-api</artifactId>
+        <version>${project.version}</version>
+  	</dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-stream-process-base</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+  	<dependency>
+  		<groupId>org.apache.eagle</groupId>
+  		<artifactId>eagle-job-common</artifactId>
+  		<version>${project.version}</version>
+  	</dependency>  	  	
+  	<dependency>
+		<groupId>org.jsoup</groupId>
+		<artifactId>jsoup</artifactId>
+	</dependency>
+  	<dependency>
+  		<groupId>org.apache.storm</groupId>
+  		<artifactId>storm-core</artifactId>
+  		<exclusions>
+      		<exclusion>
+      			<groupId>ch.qos.logback</groupId>
+        		<artifactId>logback-classic</artifactId>
+      		</exclusion>
+      	</exclusions> 
+  	</dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpa-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpa-spark-running/pom.xml b/eagle-jpm/eagle-jpa-spark-running/pom.xml
new file mode 100644
index 0000000..42c476a
--- /dev/null
+++ b/eagle-jpm/eagle-jpa-spark-running/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.eagle</groupId>
+    <artifactId>eagle-jpm-parent</artifactId>
+    <version>0.3.0-incubating</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>eagle-jpm-spark-running</artifactId>
+  <name>eagle-jpm-spark-running</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+  	<dependency>
+  		<groupId>org.slf4j</groupId>
+  		<artifactId>slf4j-api</artifactId>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.eagle</groupId>
+  		<artifactId>eagle-stream-process-api</artifactId>
+        <version>${project.version}</version>
+  	</dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-stream-process-base</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+  	<dependency>
+  		<groupId>org.apache.eagle</groupId>
+  		<artifactId>eagle-job-common</artifactId>
+  		<version>${project.version}</version>
+  	</dependency>  	  	
+  	<dependency>
+		<groupId>org.jsoup</groupId>
+		<artifactId>jsoup</artifactId>
+	</dependency>
+  	<dependency>
+  		<groupId>org.apache.storm</groupId>
+  		<artifactId>storm-core</artifactId>
+  		<exclusions>
+      		<exclusion>
+      			<groupId>ch.qos.logback</groupId>
+        		<artifactId>logback-classic</artifactId>
+      		</exclusion>
+      	</exclusions> 
+  	</dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/pom.xml b/eagle-jpm/eagle-jpm-entity/pom.xml
new file mode 100644
index 0000000..29be4ab
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-jpm-parent</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-jpm-entity</artifactId>
+    <packaging>jar</packaging>
+
+    <name>eagle-jpm-entity</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>3.8.1</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java
new file mode 100644
index 0000000..f54688b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JPMEntityRepository.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+    public JPMEntityRepository() {
+        entitySet.add(SparkApp.class);
+        entitySet.add(SparkJob.class);
+        entitySet.add(SparkStage.class);
+        entitySet.add(SparkTask.class);
+        entitySet.add(SparkExecutor.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java
new file mode 100644
index 0000000..de3bd7a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/JobConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class JobConfig implements Serializable {
+    private Map<String, String> config = new TreeMap<>();
+
+    public Map<String, String> getConfig() {
+        return config;
+    }
+
+    public void setConfig(Map<String, String> config) {
+        this.config = config;
+    }
+    @Override
+    public String toString(){
+        return config.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java
new file mode 100644
index 0000000..1760753
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkApp.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_apps")
+@ColumnFamily("f")
+@Prefix("sprkapp")
+@Service(Constants.SPARK_APP_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName","user", "queue"})
+@Partition({"site"})
+public class SparkApp extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private long  startTime;
+    @Column("b")
+    private long endTime;
+    @Column("c")
+    private String yarnState;
+    @Column("d")
+    private String yarnStatus;
+    @Column("e")
+    private JobConfig config;
+    @Column("f")
+    private int numJobs;
+    @Column("g")
+    private int totalStages;
+    @Column("h")
+    private int skippedStages;
+    @Column("i")
+    private int failedStages;
+    @Column("j")
+    private int totalTasks;
+    @Column("k")
+    private int skippedTasks;
+    @Column("l")
+    private int failedTasks;
+    @Column("m")
+    private int executors;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private long executorDeserializeTime;
+    @Column("w")
+    private long executorRunTime;
+    @Column("x")
+    private long resultSize;
+    @Column("y")
+    private long jvmGcTime;
+    @Column("z")
+    private long resultSerializationTime;
+    @Column("ab")
+    private long memoryBytesSpilled;
+    @Column("ac")
+    private long diskBytesSpilled;
+    @Column("ad")
+    private long execMemoryBytes;
+    @Column("ae")
+    private long driveMemoryBytes;
+    @Column("af")
+    private int completeTasks;
+    @Column("ag")
+    private long totalExecutorTime;
+    @Column("ah")
+    private long executorMemoryOverhead;
+    @Column("ai")
+    private long driverMemoryOverhead;
+    @Column("aj")
+    private int executorCores;
+    @Column("ak")
+    private int driverCores;
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public String getYarnState() {
+        return yarnState;
+    }
+
+    public String getYarnStatus() {
+        return yarnStatus;
+    }
+
+    public int getNumJobs() {
+        return numJobs;
+    }
+
+    public int getTotalStages() {
+        return totalStages;
+    }
+
+    public int getSkippedStages() {
+        return skippedStages;
+    }
+
+    public int getFailedStages() {
+        return failedStages;
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public int getSkippedTasks() {
+        return skippedTasks;
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public int getExecutors() {
+        return executors;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public long getDriveMemoryBytes() {
+        return driveMemoryBytes;
+    }
+
+    public int getCompleteTasks(){ return completeTasks;}
+
+    public JobConfig getConfig() {
+        return config;
+    }
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        valueChanged("endTime");
+    }
+
+    public void setYarnState(String yarnState) {
+        this.yarnState = yarnState;
+        valueChanged("yarnState");
+    }
+
+    public void setYarnStatus(String yarnStatus) {
+        this.yarnStatus = yarnStatus;
+        valueChanged("yarnStatus");
+    }
+
+    public void setConfig(JobConfig config) {
+        this.config = config;
+        valueChanged("config");
+    }
+
+    public void setNumJobs(int numJobs) {
+        this.numJobs = numJobs;
+        valueChanged("numJobs");
+    }
+
+    public void setTotalStages(int totalStages) {
+        this.totalStages = totalStages;
+        valueChanged("totalStages");
+    }
+
+    public void setSkippedStages(int skippedStages) {
+        this.skippedStages = skippedStages;
+        valueChanged("skippedStages");
+    }
+
+    public void setFailedStages(int failedStages) {
+        this.failedStages = failedStages;
+        valueChanged("failedStages");
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        valueChanged("totalTasks");
+    }
+
+    public void setSkippedTasks(int skippedTasks) {
+        this.skippedTasks = skippedTasks;
+        valueChanged("skippedTasks");
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        valueChanged("failedTasks");
+    }
+
+    public void setExecutors(int executors) {
+        this.executors = executors;
+        valueChanged("executors");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadBytes = shuffleReadRemoteBytes;
+        valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        valueChanged("shuffleWriteRecords");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        valueChanged("diskBytesSpilled");
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        valueChanged("execMemoryBytes");
+    }
+
+    public void setDriveMemoryBytes(long driveMemoryBytes) {
+        this.driveMemoryBytes = driveMemoryBytes;
+        valueChanged("driveMemoryBytes");
+    }
+
+    public void setCompleteTasks(int completeTasks){
+        this.completeTasks = completeTasks;
+        valueChanged("completeTasks");
+    }
+
+    public long getTotalExecutorTime() {
+        return totalExecutorTime;
+    }
+
+    public void setTotalExecutorTime(long totalExecutorTime) {
+        this.totalExecutorTime = totalExecutorTime;
+        valueChanged("totalExecutorTime");
+    }
+
+    public long getExecutorMemoryOverhead() {
+        return executorMemoryOverhead;
+    }
+
+    public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
+        this.executorMemoryOverhead = executorMemoryOverhead;
+        valueChanged("executorMemoryOverhead");
+    }
+
+    public long getDriverMemoryOverhead() {
+        return driverMemoryOverhead;
+    }
+
+    public void setDriverMemoryOverhead(long driverMemoryOverhead) {
+        this.driverMemoryOverhead = driverMemoryOverhead;
+        valueChanged("driverMemoryOverhead");
+    }
+
+    public int getExecutorCores() {
+        return executorCores;
+    }
+
+    public void setExecutorCores(int executorCores) {
+        this.executorCores = executorCores;
+        valueChanged("executorCores");
+    }
+
+    public int getDriverCores() {
+        return driverCores;
+    }
+
+    public void setDriverCores(int driverCores) {
+        this.driverCores = driverCores;
+        valueChanged("driverCores");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java
new file mode 100644
index 0000000..92cb130
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkExecutor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_executors")
+@ColumnFamily("f")
+@Prefix("sprkexcutr")
+@Service(Constants.SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "executorId","user", "queue"})
+@Partition({"site"})
+public class SparkExecutor extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private String hostPort;
+    @Column("b")
+    private int rddBlocks;
+    @Column("c")
+    private long memoryUsed;
+    @Column("d")
+    private long diskUsed;
+    @Column("e")
+    private int activeTasks = 0;
+    @Column("f")
+    private int failedTasks = 0;
+    @Column("g")
+    private int completedTasks = 0;
+    @Column("h")
+    private int totalTasks = 0;
+    @Column("i")
+    private long totalDuration = 0;
+    @Column("j")
+    private long totalInputBytes = 0;
+    @Column("k")
+    private long totalShuffleRead = 0;
+    @Column("l")
+    private long totalShuffleWrite = 0;
+    @Column("m")
+    private long maxMemory;
+    @Column("n")
+    private long startTime;
+    @Column("o")
+    private long endTime = 0;
+    @Column("p")
+    private long execMemoryBytes;
+    @Column("q")
+    private int cores;
+    @Column("r")
+    private long memoryOverhead;
+
+    public String getHostPort() {
+        return hostPort;
+    }
+
+    public void setHostPort(String hostPort) {
+        this.hostPort = hostPort;
+        this.valueChanged("hostPort");
+    }
+
+    public int getRddBlocks() {
+        return rddBlocks;
+    }
+
+    public void setRddBlocks(int rddBlocks) {
+        this.rddBlocks = rddBlocks;
+        this.valueChanged("rddBlocks");
+    }
+
+    public long getMemoryUsed() {
+        return memoryUsed;
+    }
+
+    public void setMemoryUsed(long memoryUsed) {
+        this.memoryUsed = memoryUsed;
+        this.valueChanged("memoryUsed");
+    }
+
+    public long getDiskUsed() {
+        return diskUsed;
+    }
+
+    public void setDiskUsed(long diskUsed) {
+        this.diskUsed = diskUsed;
+        this.valueChanged("diskUsed");
+    }
+
+    public int getActiveTasks() {
+        return activeTasks;
+    }
+
+    public void setActiveTasks(int activeTasks) {
+        this.activeTasks = activeTasks;
+        this.valueChanged("activeTasks");
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        this.valueChanged("failedTasks");
+    }
+
+    public int getCompletedTasks() {
+        return completedTasks;
+    }
+
+    public void setCompletedTasks(int completedTasks) {
+        this.completedTasks = completedTasks;
+        this.valueChanged("completedTasks");
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        this.valueChanged("totalTasks");
+    }
+
+    public long getTotalDuration() {
+        return totalDuration;
+    }
+
+    public void setTotalDuration(long totalDuration) {
+        this.totalDuration = totalDuration;
+        this.valueChanged("totalDuration");
+    }
+
+    public long getTotalInputBytes() {
+        return totalInputBytes;
+    }
+
+    public void setTotalInputBytes(long totalInputBytes) {
+        this.totalInputBytes = totalInputBytes;
+        this.valueChanged("totalInputBytes");
+    }
+
+    public long getTotalShuffleRead() {
+        return totalShuffleRead;
+    }
+
+    public void setTotalShuffleRead(long totalShuffleRead) {
+        this.totalShuffleRead = totalShuffleRead;
+        this.valueChanged("totalShuffleRead");
+    }
+
+    public long getTotalShuffleWrite() {
+        return totalShuffleWrite;
+    }
+
+    public void setTotalShuffleWrite(long totalShuffleWrite) {
+        this.totalShuffleWrite = totalShuffleWrite;
+        this.valueChanged("totalShuffleWrite");
+    }
+
+    public long getMaxMemory() {
+        return maxMemory;
+    }
+
+    public void setMaxMemory(long maxMemory) {
+        this.maxMemory = maxMemory;
+        this.valueChanged("maxMemory");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        this.valueChanged("endTime");
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        this.valueChanged("execMemoryBytes");
+    }
+
+    public int getCores() {
+        return cores;
+    }
+
+    public void setCores(int cores) {
+        this.cores = cores;
+        valueChanged("cores");
+    }
+
+    public long getMemoryOverhead() {
+        return memoryOverhead;
+    }
+
+    public void setMemoryOverhead(long memoryOverhead) {
+        this.memoryOverhead = memoryOverhead;
+        valueChanged("memoryOverhead");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java
new file mode 100644
index 0000000..a641440
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkJob.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_jobs")
+@ColumnFamily("f")
+@Prefix("sprkjob")
+@Service(Constants.SPARK_JOB_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId","user", "queue"})
+@Partition({"site"})
+public class SparkJob extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private long  submissionTime;
+    @Column("b")
+    private long completionTime;
+    @Column("c")
+    private int numStages=0;
+    @Column("d")
+    private String status;
+    @Column("e")
+    private int numTask=0;
+    @Column("f")
+    private int numActiveTasks=0;
+    @Column("g")
+    private int numCompletedTasks=0;
+    @Column("h")
+    private int numSkippedTasks=0;
+    @Column("i")
+    private int numFailedTasks=0;
+    @Column("j")
+    private int numActiveStages=0;
+    @Column("k")
+    private int numCompletedStages=0;
+    @Column("l")
+    private int numSkippedStages=0;
+    @Column("m")
+    private int numFailedStages=0;
+
+    public long getSubmissionTime() {
+        return submissionTime;
+    }
+
+    public long getCompletionTime() {
+        return completionTime;
+    }
+
+    public int getNumStages() {
+        return numStages;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumTask() {
+        return numTask;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumSkippedTasks() {
+        return numSkippedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public int getNumActiveStages() {
+        return numActiveStages;
+    }
+
+    public int getNumCompletedStages() {
+        return numCompletedStages;
+    }
+
+    public int getNumSkippedStages() {
+        return numSkippedStages;
+    }
+
+    public int getNumFailedStages() {
+        return numFailedStages;
+    }
+
+    public void setSubmissionTime(long submissionTime) {
+        this.submissionTime = submissionTime;
+        this.valueChanged("submissionTime");
+    }
+
+    public void setCompletionTime(long completionTime) {
+        this.completionTime = completionTime;
+        this.valueChanged("completionTime");
+    }
+
+    public void setNumStages(int numStages) {
+        this.numStages = numStages;
+        this.valueChanged("numStages");
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumTask(int numTask) {
+        this.numTask = numTask;
+        this.valueChanged("numTask");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumSkippedTasks(int numSkippedTasks) {
+        this.numSkippedTasks = numSkippedTasks;
+        this.valueChanged("numSkippedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setNumActiveStages(int numActiveStages) {
+        this.numActiveStages = numActiveStages;
+        this.valueChanged("numActiveStages");
+    }
+
+    public void setNumCompletedStages(int numCompletedStages) {
+        this.numCompletedStages = numCompletedStages;
+        this.valueChanged("numCompletedStages");
+    }
+
+    public void setNumSkippedStages(int numSkippedStages) {
+        this.numSkippedStages = numSkippedStages;
+        this.valueChanged("numSkippedStages");
+    }
+
+    public void setNumFailedStages(int numFailedStages) {
+        this.numFailedStages = numFailedStages;
+        this.valueChanged("numFailedStages");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java
new file mode 100644
index 0000000..92714bf
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkStage.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_stages")
+@ColumnFamily("f")
+@Prefix("sprkstage")
+@Service(Constants.SPARK_STAGE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkStage extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private String status;
+    @Column("b")
+    private int numActiveTasks=0;
+    @Column("c")
+    private int numCompletedTasks=0;
+    @Column("d")
+    private int numFailedTasks=0;
+    @Column("e")
+    private long executorRunTime=0l;
+    @Column("f")
+    private long inputBytes=0l;
+    @Column("g")
+    private long inputRecords=0l;
+    @Column("h")
+    private long outputBytes=0l;
+    @Column("i")
+    private long outputRecords=0l;
+    @Column("j")
+    private long shuffleReadBytes=0l;
+    @Column("k")
+    private long shuffleReadRecords=0l;
+    @Column("l")
+    private long shuffleWriteBytes=0l;
+    @Column("m")
+    private long shuffleWriteRecords=0l;
+    @Column("n")
+    private long memoryBytesSpilled=0l;
+    @Column("o")
+    private long diskBytesSpilled=0l;
+    @Column("p")
+    private String name;
+    @Column("q")
+    private String schedulingPool;
+    @Column("r")
+    private long submitTime;
+    @Column("s")
+    private long completeTime;
+    @Column("t")
+    private int numTasks;
+    @Column("u")
+    private long executorDeserializeTime;
+    @Column("v")
+    private long resultSize;
+    @Column("w")
+    private long jvmGcTime;
+    @Column("x")
+    private long resultSerializationTime;
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getSchedulingPool() {
+        return schedulingPool;
+    }
+
+    public long getSubmitTime() {
+        return submitTime;
+    }
+
+    public long getCompleteTime() {
+        return completeTime;
+    }
+
+    public int getNumTasks() {
+        return numTasks;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadBytes) {
+        this.shuffleReadBytes = shuffleReadBytes;
+        this.valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setName(String name) {
+        this.name = name;
+        this.valueChanged("name");
+    }
+
+    public void setSchedulingPool(String schedulingPool) {
+        this.schedulingPool = schedulingPool;
+        this.valueChanged("schedulingPool");
+    }
+
+    public void setSubmitTime(long submitTime) {
+        this.submitTime = submitTime;
+        this.valueChanged("submitTime");
+    }
+
+    public void setCompleteTime(long completeTime) {
+        this.completeTime = completeTime;
+        this.valueChanged("completeTime");
+    }
+
+    public void setNumTasks(int numTasks) {
+        this.numTasks = numTasks;
+        valueChanged("numTasks");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java
new file mode 100644
index 0000000..af9ed21
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/entity/SparkTask.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.apache.eagle.jpm.util.Constants;
+
+@Table("eglesprk_tasks")
+@ColumnFamily("f")
+@Prefix("sprktask")
+@Service(Constants.SPARK_TASK_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sprkAppId", "sprkAppAttemptId", "sprkAppName", "normSprkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkTask extends TaggedLogAPIEntity{
+
+    @Column("a")
+    private int taskId;
+    @Column("b")
+    private long launchTime;
+    @Column("c")
+    private String executorId;
+    @Column("d")
+    private String host;
+    @Column("e")
+    private String taskLocality;
+    @Column("f")
+    private boolean speculative;
+    @Column("g")
+    private long executorDeserializeTime;
+    @Column("h")
+    private long executorRunTime;
+    @Column("i")
+    private long resultSize;
+    @Column("j")
+    private long jvmGcTime;
+    @Column("k")
+    private long resultSerializationTime;
+    @Column("l")
+    private long memoryBytesSpilled;
+    @Column("m")
+    private long diskBytesSpilled;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadRemoteBytes;
+    @Column("x")
+    private long shuffleReadLocalBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private boolean failed;
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public long getLaunchTime() {
+        return launchTime;
+    }
+
+    public String getExecutorId() {
+        return executorId;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public String getTaskLocality() {
+        return taskLocality;
+    }
+
+    public boolean isSpeculative() {
+        return speculative;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public boolean isFailed() {
+        return failed;
+    }
+
+    public long getShuffleReadRemoteBytes() {
+        return shuffleReadRemoteBytes;
+    }
+
+    public long getShuffleReadLocalBytes() {
+        return shuffleReadLocalBytes;
+    }
+
+    public void setFailed(boolean failed) {
+        this.failed = failed;
+        valueChanged("failed");
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+        valueChanged("taskId");
+    }
+
+    public void setLaunchTime(long launchTime) {
+        this.launchTime = launchTime;
+        valueChanged("launchTime");
+    }
+
+    public void setExecutorId(String executorId) {
+        this.executorId = executorId;
+        valueChanged("executorId");
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+        this.valueChanged("host");
+    }
+
+    public void setTaskLocality(String taskLocality) {
+        this.taskLocality = taskLocality;
+        this.valueChanged("taskLocality");
+    }
+
+    public void setSpeculative(boolean speculative) {
+        this.speculative = speculative;
+        this.valueChanged("speculative");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        this.valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        this.valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        this.valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        this.valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
+        this.valueChanged("shuffleReadRemoteBytes");
+    }
+
+    public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
+        this.shuffleReadLocalBytes = shuffleReadLocalBytes;
+        this.valueChanged("shuffleReadLocalBytes");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
new file mode 100644
index 0000000..97be7ec
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-jpm-parent</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-jpm-mr-history</artifactId>
+    <name>eagle-jpm-mr-history</name>
+    <url>http://maven.apache.org</url>
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-stream-process-api</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.wso2.orbit.com.lmax</groupId>
+                    <artifactId>disruptor</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-stream-process-base</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.wso2.orbit.com.lmax</groupId>
+                    <artifactId>disruptor</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-annotations</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-app</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/assembly/eagle-jpm-mr-history-assembly.xml</descriptor>
+                    <finalName>eagle-jpm-mr-history-${project.version}</finalName>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <tarLongFileMode>posix</tarLongFileMode>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml b/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml
new file mode 100644
index 0000000..cf6d108
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/assembly/eagle-jpm-mr-history-assembly.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+    <id>assembly</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>false</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <!--includes>
+                <include>org.apache.hadoop:hadoop-common</include>
+                <include>org.apache.hadoop:hadoop-hdfs</include>
+                <include>org.apache.hadoop:hadoop-client</include>
+                <include>org.apache.hadoop:hadoop-auth</include>
+                <include>org.apache.eagle:eagle-stream-process-api</include>
+                <include>org.apache.eagle:eagle-stream-process-base</include>
+                <include>org.jsoup:jsoup</include>
+            </includes-->
+            <excludes>
+                <exclude>org.wso2.orbit.com.lmax:disruptor</exclude>
+                <exclude>asm:asm</exclude>
+                <exclude>org.apache.storm:storm-core</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.outputDirectory}/</directory>
+            <outputDirectory>/</outputDirectory>
+            <!--<includes>-->
+                <!--<include>*.conf</include>-->
+                <!--<include>*.xml</include>-->
+                <!--<include>*.properties</include>-->
+                <!--<include>*.config</include>-->
+                <!--<include>classes/META-INF/*</include>-->
+            <!--</includes>-->
+
+            <excludes>
+                <exclude>*.yaml</exclude>
+            </excludes>
+        </fileSet>
+    </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
new file mode 100644
index 0000000..7c0530d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
+import org.apache.eagle.jpm.mr.history.storm.HistoryJobProgressBolt;
+import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class MRHistoryJobMain {
+    public static void main(String []args) {
+        try {
+            //1. trigger init conf
+            JHFConfigManager jhfConfigManager = JHFConfigManager.getInstance(args);
+
+            //2. init JobHistoryContentFilter
+            JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
+            List<String> confKeyPatterns = jhfConfigManager.getConfig().getStringList("MRConfigureKeys");
+            confKeyPatterns.add(JPAConstants.JobConfiguration.CASCADING_JOB);
+            confKeyPatterns.add(JPAConstants.JobConfiguration.HIVE_JOB);
+            confKeyPatterns.add(JPAConstants.JobConfiguration.PIG_JOB);
+            confKeyPatterns.add(JPAConstants.JobConfiguration.SCOOBI_JOB);
+
+            for (String key : confKeyPatterns) {
+                builder.includeJobKeyPatterns(Pattern.compile(key));
+            }
+            JobHistoryContentFilter filter = builder.build();
+
+            //3. init topology
+            TopologyBuilder topologyBuilder = new TopologyBuilder();
+            String topologyName = "mrHistoryJobTopology";
+            String spoutName = "mrHistoryJobExecutor";
+            String boltName = "updateProcessTime";
+            int parallelism = jhfConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
+            int tasks = jhfConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
+            if (parallelism > tasks) {
+                parallelism = tasks;
+            }
+            topologyBuilder.setSpout(
+                    spoutName,
+                    new JobHistorySpout(filter, jhfConfigManager),
+                    parallelism
+            ).setNumTasks(tasks);
+            topologyBuilder.setBolt(boltName, new HistoryJobProgressBolt(spoutName, jhfConfigManager), 1).setNumTasks(1).allGrouping(spoutName);
+
+            backtype.storm.Config config = new backtype.storm.Config();
+            config.setNumWorkers(jhfConfigManager.getConfig().getInt("envContextConfig.workers"));
+            config.put(Config.TOPOLOGY_DEBUG, true);
+            if (!jhfConfigManager.getEnv().equals("local")) {
+                //cluster mode
+                //parse conf here
+                StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
+            } else {
+                //local mode
+                LocalCluster cluster = new LocalCluster();
+                cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}