You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:36 UTC

[40/52] [abbrv] incubator-eagle git commit: Update spark history job feeder config & refactor the code

Update spark history job feeder config & refactor the code

Author: Qingwen Zhao <qi...@gmail.com>

Closes #416 from qingwen220/sparkHist.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3110c72e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3110c72e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3110c72e

Branch: refs/heads/master
Commit: 3110c72e47f697f5c59b5f7d6559d527cf25db3a
Parents: 8774b85
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Wed Sep 7 10:36:24 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Wed Sep 7 10:36:24 2016 +0800

----------------------------------------------------------------------
 .../environment/impl/StormExecutionRuntime.java |  32 +-
 .../apache/eagle/jpm/spark/crawl/EventType.java |  24 -
 .../jpm/spark/crawl/JHFInputStreamReader.java   |  24 -
 .../eagle/jpm/spark/crawl/JHFParserBase.java    |  29 -
 .../jpm/spark/crawl/JHFSparkEventReader.java    | 713 -------------------
 .../eagle/jpm/spark/crawl/JHFSparkParser.java   |  73 --
 .../jpm/spark/crawl/SparkApplicationInfo.java   |  69 --
 .../SparkFilesystemInputStreamReaderImpl.java   |  53 --
 .../running/entities/JPMEntityRepository.java   |  33 +
 .../jpm/spark/running/entities/JobConfig.java   |  26 +
 .../spark/running/entities/SparkAppEntity.java  | 476 +++++++++++++
 .../running/entities/SparkExecutorEntity.java   | 233 ++++++
 .../spark/running/entities/SparkJobEntity.java  | 191 +++++
 .../running/entities/SparkStageEntity.java      | 299 ++++++++
 .../spark/running/entities/SparkTaskEntity.java | 290 ++++++++
 .../spark/history/SparkHistoryJobAppConfig.java |   4 -
 .../history/crawl/JHFInputStreamReader.java     |  24 +
 .../jpm/spark/history/crawl/JHFParserBase.java  |  29 +
 .../history/crawl/JHFSparkEventReader.java      | 713 +++++++++++++++++++
 .../jpm/spark/history/crawl/JHFSparkParser.java |  73 ++
 .../history/crawl/SparkApplicationInfo.java     |  69 ++
 .../SparkFilesystemInputStreamReaderImpl.java   |  53 ++
 .../status/JobHistoryZKStateManager.java        |   7 +-
 .../history/storm/SparkHistoryJobParseBolt.java |  13 +-
 .../history/storm/SparkHistoryJobSpout.java     |   5 +-
 ...spark.history.SparkHistoryJobAppProvider.xml |  18 -
 .../src/main/resources/application.conf         |   9 +-
 .../running/entities/JPMEntityRepository.java   |  30 -
 .../jpm/spark/running/entities/JobConfig.java   |  25 -
 .../spark/running/entities/SparkAppEntity.java  | 475 ------------
 .../running/entities/SparkExecutorEntity.java   | 232 ------
 .../spark/running/entities/SparkJobEntity.java  | 190 -----
 .../running/entities/SparkStageEntity.java      | 298 --------
 .../spark/running/entities/SparkTaskEntity.java | 289 --------
 .../running/parser/SparkApplicationParser.java  |   8 +-
 .../src/main/resources/application.conf         |   6 +-
 .../apache/eagle/jpm/util/SparkEventType.java   |  25 +
 .../util/resourcefetch/RMResourceFetcher.java   |   2 +-
 38 files changed, 2575 insertions(+), 2587 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 04cc19b..e37e8f2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -34,13 +34,13 @@ import scala.Int;
 import storm.trident.spout.RichSpoutBatchExecutor;
 
 public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
-    private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
     private static LocalCluster _localCluster;
 
     private StormEnvironment environment;
 
-    private static LocalCluster getLocalCluster(){
-        if(_localCluster == null){
+    private static LocalCluster getLocalCluster() {
+        if (_localCluster == null) {
             _localCluster = new LocalCluster();
         }
         return _localCluster;
@@ -56,13 +56,13 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         return this.environment;
     }
 
-    private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
-    private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
-    private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
-    private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
+    private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
+    private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+    private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
+    private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
     private static final String WORKERS = "workers";
 
-    public backtype.storm.Config getStormConfig(){
+    public backtype.storm.Config getStormConfig() {
         backtype.storm.Config conf = new backtype.storm.Config();
         conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
         conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
@@ -71,14 +71,14 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
         conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
         String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
-        if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
+        if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
             nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
             LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
         } else {
             LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
         }
         Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
-        if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
+        if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
             nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
             LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
         } else {
@@ -94,15 +94,15 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
     }
 
     @Override
-    public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config){
+    public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
         String topologyName = config.getString("appId");
-        Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
+        Preconditions.checkNotNull(topologyName,"[appId] is required by null for " + executor.getClass().getCanonicalName());
         StormTopology topology = executor.execute(config, environment);
-        LOG.info("Starting {} ({}), mode: {}",topologyName,executor.getClass().getCanonicalName(), config.getString("mode"));
+        LOG.info("Starting {} ({}), mode: {}",topologyName, executor.getClass().getCanonicalName(), config.getString("mode"));
         Config conf = getStormConfig();
-        if(ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))){
+        if (ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))) {
             String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
-            if(jarFile == null){
+            if (jarFile == null) {
                 jarFile = DynamicJarPathFinder.findPath(executor.getClass());
             }
             synchronized (StormExecutionRuntime.class) {
@@ -129,7 +129,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
     public void stop(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
         String appId = config.getString("appId");
         LOG.info("Stopping topology {} ..." + appId);
-        if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+        if (config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()) {
             Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
             try {
                 stormClient.killTopology(appId);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
deleted file mode 100644
index 1ba15b7..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-public enum EventType {
-    SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart,
-    SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved,
-    SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
deleted file mode 100644
index 8a8d0db..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import java.io.InputStream;
-
-public interface JHFInputStreamReader {
-    void read(InputStream is) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
deleted file mode 100644
index 62ba7d9..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import java.io.InputStream;
-
-public interface JHFParserBase {
-    /**
-     * this method will ensure to close the inputStream.
-     * @param is
-     * @throws Exception
-     */
-    void parse(InputStream is) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
deleted file mode 100644
index 22b715a..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.eagle.jpm.spark.entity.*;
-import org.apache.eagle.jpm.util.*;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.service.client.EagleServiceClientException;
-import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class JHFSparkEventReader {
-    private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class);
-
-    private static final int FLUSH_LIMIT = 500;
-    private long firstTaskLaunchTime;
-    private long lastEventTime;
-
-    private Map<String, SparkExecutor> executors;
-    private SparkApp app;
-    private Map<Integer, SparkJob> jobs;
-    private Map<String, SparkStage> stages;
-    private Map<Integer, Set<String>> jobStageMap;
-    private Map<Long, SparkTask> tasks;
-    private EagleServiceClientImpl client;
-    private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
-
-    private List<TaggedLogAPIEntity> createEntities;
-
-    private Config conf;
-
-    public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) {
-        app = new SparkApp();
-        app.setTags(new HashMap<String, String>(baseTags));
-        app.setYarnState(info.getState());
-        app.setYarnStatus(info.getFinalStatus());
-        createEntities = new ArrayList<>();
-        jobs = new HashMap<Integer, SparkJob>();
-        stages = new HashMap<String, SparkStage>();
-        jobStageMap = new HashMap<Integer, Set<String>>();
-        tasks = new HashMap<Long, SparkTask>();
-        executors = new HashMap<String, SparkExecutor>();
-        stageTaskStatusMap = new HashMap<>();
-        conf = ConfigFactory.load();
-        this.initiateClient();
-    }
-
-    public SparkApp getApp() {
-        return this.app;
-    }
-
-    public void read(JSONObject eventObj) {
-        String eventType = (String) eventObj.get("Event");
-        if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) {
-            handleAppStarted(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
-            handleEnvironmentSet(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) {
-            handleExecutorAdd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString())) {
-            handleBlockManagerAdd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) {
-            handleJobStart(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) {
-            handleStageSubmit(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) {
-            handleTaskStart(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) {
-            handleTaskEnd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) {
-            handleStageComplete(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) {
-            handleJobEnd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) {
-            handleExecutorRemoved(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) {
-            handleAppEnd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString())) {
-            //nothing to do now
-        } else {
-            LOG.info("Not registered event type:" + eventType);
-        }
-
-    }
-
-    private void handleEnvironmentSet(JSONObject event) {
-        app.setConfig(new JobConfig());
-        JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
-
-        String[] additionalJobConf = conf.getString("basic.jobConf.additional.info").split(",\\s*");
-        String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
-            "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
-            "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
-        String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, props);
-        for (String prop : jobConf) {
-            if (sparkProps.containsKey(prop)) {
-                app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop));
-            }
-        }
-    }
-
-    private Object getConfigVal(JobConfig config, String configName, String type) {
-        if (config.getConfig().containsKey(configName)) {
-            Object val = config.getConfig().get(configName);
-            if (type.equalsIgnoreCase(Integer.class.getName())) {
-                return Integer.parseInt((String) val);
-            } else {
-                return val;
-            }
-        } else {
-            if (type.equalsIgnoreCase(Integer.class.getName())) {
-                return conf.getInt("spark.defaultVal." + configName);
-            } else {
-                return conf.getString("spark.defaultVal." + configName);
-            }
-        }
-    }
-
-    private boolean isClientMode(JobConfig config) {
-        return config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client");
-    }
-
-    private void handleAppStarted(JSONObject event) {
-        //need update all entities tag before app start
-        List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
-        entities.addAll(this.executors.values());
-        entities.add(this.app);
-
-        long appStartTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        for (TaggedLogAPIEntity entity : entities) {
-            entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtils.getString(event, "App ID"));
-            entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtils.getString(event, "App Name"));
-            // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1.
-            String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtils.getString(event, "App Attempt ID");
-            entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId);
-            // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text
-            // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
-            entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtils.getString(event, "App Name"), null));
-            entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtils.getString(event, "User"));
-
-            entity.setTimestamp(appStartTime);
-        }
-
-        this.app.setStartTime(appStartTime);
-        this.lastEventTime = appStartTime;
-    }
-
-    private void handleExecutorAdd(JSONObject event) {
-        String executorID = (String) event.get("Executor ID");
-        long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        this.lastEventTime = executorAddTime;
-        SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime);
-
-        JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor Info");
-
-    }
-
-    private void handleBlockManagerAdd(JSONObject event) {
-        long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
-        long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        this.lastEventTime = timestamp;
-        JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager ID");
-        String executorID = JSONUtils.getString(blockInfo, "Executor ID");
-        String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + JSONUtils.getLong(blockInfo, "Port");
-
-        SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
-        executor.setMaxMemory(maxMemory);
-        executor.setHostPort(hostAndPort);
-    }
-
-    private void handleTaskStart(JSONObject event) {
-        this.initializeTask(event);
-    }
-
-    private void handleTaskEnd(JSONObject event) {
-        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
-        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
-        SparkTask task = tasks.get(taskId);
-        if (task == null) {
-            return;
-        }
-
-        task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed"));
-        JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task Metrics");
-        if (null != taskMetrics) {
-            task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, "Executor Deserialize Time", lastEventTime));
-            task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor Run Time", lastEventTime));
-            task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", lastEventTime));
-            task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size"));
-            task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, "Result Serialization Time", lastEventTime));
-            task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory Bytes Spilled"));
-            task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk Bytes Spilled"));
-
-            JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, "Input Metrics");
-            if (null != inputMetrics) {
-                task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes Read"));
-                task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records Read"));
-            }
-
-            JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, "Output Metrics");
-            if (null != outputMetrics) {
-                task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes Written"));
-                task.setOutputRecords(JSONUtils.getLong(outputMetrics, "Records Written"));
-            }
-
-            JSONObject shuffleWriteMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics");
-            if (null != shuffleWriteMetrics) {
-                task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes Written"));
-                task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Records Written"));
-            }
-
-            JSONObject shuffleReadMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics");
-            if (null != shuffleReadMetrics) {
-                task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local Bytes Read"));
-                task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote Bytes Read"));
-                task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records Read"));
-            }
-        } else {
-            //for tasks success without task metrics, save in the end if no other information
-            if (!task.isFailed()) {
-                return;
-            }
-        }
-
-        aggregateToStage(task);
-        aggregateToExecutor(task);
-        tasks.remove(taskId);
-        this.flushEntities(task, false);
-    }
-
-
-    private SparkTask initializeTask(JSONObject event) {
-        SparkTask task = new SparkTask();
-        task.setTags(new HashMap<>(this.app.getTags()));
-        task.setTimestamp(app.getTimestamp());
-
-        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage ID")));
-        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
-
-        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
-        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
-        task.setTaskId(taskId);
-
-        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index")));
-        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
-        long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime);
-        this.lastEventTime = launchTime;
-        if (taskId == 0) {
-            this.setFirstTaskLaunchTime(launchTime);
-        }
-        task.setLaunchTime(launchTime);
-        task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID"));
-        task.setHost(JSONUtils.getString(taskInfo, "Host"));
-        task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality"));
-        task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative"));
-
-        tasks.put(task.getTaskId(), task);
-        return task;
-    }
-
-    private void setFirstTaskLaunchTime(long launchTime) {
-        this.firstTaskLaunchTime = launchTime;
-    }
-
-    private void handleJobStart(JSONObject event) {
-        SparkJob job = new SparkJob();
-        job.setTags(new HashMap<>(this.app.getTags()));
-        job.setTimestamp(app.getTimestamp());
-
-        int jobId = JSONUtils.getInt(event, "Job ID");
-        job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
-        long submissionTime = JSONUtils.getLong(event, "Submission Time", lastEventTime);
-        job.setSubmissionTime(submissionTime);
-        this.lastEventTime = submissionTime;
-
-        //for complete application, no active stages/tasks
-        job.setNumActiveStages(0);
-        job.setNumActiveTasks(0);
-
-        this.jobs.put(jobId, job);
-        this.jobStageMap.put(jobId, new HashSet<String>());
-
-        JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos");
-        int stagesSize = (stages == null ? 0 : stages.size());
-        job.setNumStages(stagesSize);
-        for (int i = 0; i < stagesSize; i++) {
-            JSONObject stageInfo = (JSONObject) stages.get(i);
-            int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
-            int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
-            String stageName = JSONUtils.getString(stageInfo, "Stage Name");
-            int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
-            this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks);
-        }
-    }
-
-    private void handleStageSubmit(JSONObject event) {
-        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
-        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
-        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
-        String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
-        stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
-
-        if (!stages.containsKey(key)) {
-            //may be further attempt for one stage
-            String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0");
-            if (stages.containsKey(baseAttempt)) {
-                SparkStage stage = stages.get(baseAttempt);
-                String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
-
-                String stageName = JSONUtils.getString(event, "Stage Name");
-                int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
-                this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks);
-            }
-        }
-    }
-
-    private void handleStageComplete(JSONObject event) {
-        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
-        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
-        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
-        String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
-        SparkStage stage = stages.get(key);
-
-        // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0.
-        Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", firstTaskLaunchTime);
-
-        stage.setSubmitTime(submissionTime);
-
-        long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", lastEventTime);
-        stage.setCompleteTime(completeTime);
-        this.lastEventTime = completeTime;
-
-        if (stageInfo != null && stageInfo.containsKey("Failure Reason")) {
-            stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
-        } else {
-            stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());
-        }
-    }
-
-    private void handleExecutorRemoved(JSONObject event) {
-        String executorID = JSONUtils.getString(event, "Executor ID");
-        SparkExecutor executor = executors.get(executorID);
-        long removedTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        executor.setEndTime(removedTime);
-        this.lastEventTime = removedTime;
-    }
-
-    private void handleJobEnd(JSONObject event) {
-        int jobId = JSONUtils.getInt(event, "Job ID");
-        SparkJob job = jobs.get(jobId);
-
-        long completionTime = JSONUtils.getLong(event, "Completion Time", lastEventTime);
-        job.setCompletionTime(completionTime);
-        this.lastEventTime = completionTime;
-
-        JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result");
-        String result = JSONUtils.getString(jobResult, "Result");
-        if (result.equalsIgnoreCase("JobSucceeded")) {
-            job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString());
-        } else {
-            job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString());
-        }
-    }
-
-    private void handleAppEnd(JSONObject event) {
-        long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        app.setEndTime(endTime);
-        this.lastEventTime = endTime;
-    }
-
-    public void clearReader() throws Exception {
-        //clear tasks
-        for (SparkTask task : tasks.values()) {
-            LOG.info("Task {} does not have result or no task metrics.", task.getTaskId());
-            task.setFailed(true);
-            aggregateToStage(task);
-            aggregateToExecutor(task);
-            this.flushEntities(task, false);
-        }
-
-        List<SparkStage> needStoreStages = new ArrayList<>();
-        for (SparkStage stage : this.stages.values()) {
-            int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
-            if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
-                SparkJob job = this.jobs.get(jobId);
-                job.setNumSkippedStages(job.getNumSkippedStages() + 1);
-                job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks());
-            } else {
-                this.aggregateToJob(stage);
-                this.aggregateStageToApp(stage);
-                needStoreStages.add(stage);
-            }
-            String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
-            String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
-            this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId));
-        }
-
-        this.flushEntities(needStoreStages, false);
-        for (SparkJob job : jobs.values()) {
-            this.aggregateJobToApp(job);
-        }
-        this.flushEntities(jobs.values(), false);
-
-        app.setExecutors(executors.values().size());
-
-        long executorMemory = Utils.parseMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
-        long driverMemory = Utils.parseMemory(this.isClientMode(app.getConfig())
-            ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName())
-            : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
-
-        int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName());
-        int driverCore = this.isClientMode(app.getConfig())
-            ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName())
-            : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName());
-
-        long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead");
-        long driverMemoryOverhead = this.isClientMode(app.getConfig())
-            ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead")
-            : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead");
-
-        app.setExecMemoryBytes(executorMemory);
-        app.setDriveMemoryBytes(driverMemory);
-        app.setExecutorCores(executorCore);
-        app.setDriverCores(driverCore);
-        app.setExecutorMemoryOverhead(executorMemoryOverhead);
-        app.setDriverMemoryOverhead(driverMemoryOverhead);
-
-        for (SparkExecutor executor : executors.values()) {
-            String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
-            if (executorID.equalsIgnoreCase("driver")) {
-                executor.setExecMemoryBytes(driverMemory);
-                executor.setCores(driverCore);
-                executor.setMemoryOverhead(driverMemoryOverhead);
-            } else {
-                executor.setExecMemoryBytes(executorMemory);
-                executor.setCores(executorCore);
-                executor.setMemoryOverhead(executorMemoryOverhead);
-            }
-            if (app.getEndTime() <= 0L) {
-                app.setEndTime(this.lastEventTime);
-            }
-            if (executor.getEndTime() <= 0L) {
-                executor.setEndTime(app.getEndTime());
-            }
-            this.aggregateExecutorToApp(executor);
-        }
-        this.flushEntities(executors.values(), false);
-        //spark code...tricky
-        app.setSkippedTasks(app.getCompleteTasks());
-        this.flushEntities(app, true);
-    }
-
-    private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) {
-        long result = 0L;
-        String fieldValue = config.getConfig().get(fieldName);
-        if (fieldValue != null) {
-            result = Utils.parseMemory(fieldValue + "m");
-            if (result == 0L) {
-               result = Utils.parseMemory(fieldValue);
-            }
-        }
-
-        if (result == 0L) {
-            result = Math.max(
-                    Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
-                    executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
-        }
-        return result;
-    }
-
-    private void aggregateExecutorToApp(SparkExecutor executor) {
-        long totalExecutorTime = app.getTotalExecutorTime() + executor.getEndTime() - executor.getStartTime();
-        if (totalExecutorTime < 0L) {
-            totalExecutorTime = 0L;
-        }
-        app.setTotalExecutorTime(totalExecutorTime);
-    }
-
-    private void aggregateJobToApp(SparkJob job) {
-        //aggregate job level metrics
-        app.setNumJobs(app.getNumJobs() + 1);
-        app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
-        app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks());
-        app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
-        app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
-        app.setTotalStages(app.getTotalStages() + job.getNumStages());
-        app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
-        app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages());
-    }
-
-    private void aggregateStageToApp(SparkStage stage) {
-        //aggregate task level metrics
-        app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled());
-        app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled());
-        app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime());
-        app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
-        app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime());
-        app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime());
-        app.setResultSize(app.getResultSize() + stage.getResultSize());
-        app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
-        app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
-        app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords());
-        app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
-        app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords());
-        app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes());
-        app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords());
-        app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes());
-    }
-
-    private void aggregateToStage(SparkTask task) {
-        String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
-        String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
-        String key = this.generateStageKey(stageId, stageAttemptId);
-        SparkStage stage = stages.get(key);
-
-        stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled());
-        stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled());
-        stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime());
-        stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
-        stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime());
-        stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime());
-        stage.setResultSize(stage.getResultSize() + task.getResultSize());
-        stage.setInputRecords(stage.getInputRecords() + task.getInputRecords());
-        stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
-        stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords());
-        stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
-        stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords());
-        stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes());
-        stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords());
-        long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
-        stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes);
-
-        boolean success = !task.isFailed();
-
-        Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
-        if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
-            //has previous task attempt, retrieved from task index in one stage
-            boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex);
-            success = previousResult || success;
-            if (previousResult != success) {
-                stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
-                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
-                stageTaskStatusMap.get(key).put(taskIndex, success);
-            }
-        } else {
-            if (success) {
-                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
-            } else {
-                stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
-            }
-            stageTaskStatusMap.get(key).put(taskIndex, success);
-        }
-
-    }
-
-    private void aggregateToExecutor(SparkTask task) {
-        String executorId = task.getExecutorId();
-        SparkExecutor executor = executors.get(executorId);
-
-        if (null != executor) {
-            executor.setTotalTasks(executor.getTotalTasks() + 1);
-            if (task.isFailed()) {
-                executor.setFailedTasks(executor.getFailedTasks() + 1);
-            } else {
-                executor.setCompletedTasks(executor.getCompletedTasks() + 1);
-            }
-            long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
-            executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes);
-            executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
-            executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes());
-            executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes());
-            executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
-        }
-
-    }
-
-    private void aggregateToJob(SparkStage stage) {
-        int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
-        SparkJob job = jobs.get(jobId);
-        job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks());
-        job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks());
-        job.setNumTask(job.getNumTask() + stage.getNumTasks());
-
-
-        if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
-            //if multiple attempts succeed, just count one
-            if (!hasStagePriorAttemptSuccess(stage)) {
-                job.setNumCompletedStages(job.getNumCompletedStages() + 1);
-            }
-        } else {
-            job.setNumFailedStages(job.getNumFailedStages() + 1);
-        }
-    }
-
-    private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
-        int stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
-        for (int i = 0; i < stageAttemptId; i++) {
-            SparkStage previousStage = stages.get(this.generateStageKey(
-                    stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), Integer.toString(i)));
-            if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-
-    private String generateStageKey(String stageId, String stageAttemptId) {
-        return stageId + "-" + stageAttemptId;
-    }
-
-    private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) {
-        SparkStage stage = new SparkStage();
-        stage.setTags(new HashMap<>(this.app.getTags()));
-        stage.setTimestamp(app.getTimestamp());
-        stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
-        stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Integer.toString(stageId));
-        stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Integer.toString(stageAttemptId));
-        stage.setName(name);
-        stage.setNumActiveTasks(0);
-        stage.setNumTasks(numTasks);
-        stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ?
-                "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool"));
-
-        String stageKey = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
-        stages.put(stageKey, stage);
-        this.jobStageMap.get(jobId).add(stageKey);
-    }
-
-
-    private SparkExecutor initiateExecutor(String executorID, long startTime) {
-        if (!executors.containsKey(executorID)) {
-            SparkExecutor executor = new SparkExecutor();
-            executor.setTags(new HashMap<>(this.app.getTags()));
-            executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID);
-            executor.setStartTime(startTime);
-            executor.setTimestamp(app.getTimestamp());
-
-            this.executors.put(executorID, executor);
-        }
-
-        return this.executors.get(executorID);
-    }
-
-    private String getNormalizedName(String jobName, String assignedName) {
-        if (null != assignedName) {
-            return assignedName;
-        } else {
-            return JobNameNormalization.getInstance().normalize(jobName);
-        }
-    }
-
-    private void flushEntities(Object entity, boolean forceFlush) {
-        this.flushEntities(Collections.singletonList(entity), forceFlush);
-    }
-
-    private void flushEntities(Collection entities, boolean forceFlush) {
-        this.createEntities.addAll(entities);
-
-        if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
-            try {
-                this.doFlush(this.createEntities);
-                this.createEntities.clear();
-            } catch (Exception e) {
-                LOG.error("Fail to flush entities", e);
-            }
-
-        }
-    }
-
-    private EagleServiceBaseClient initiateClient() {
-        String host = conf.getString("eagleProps.eagle.service.host");
-        int port = conf.getInt("eagleProps.eagle.service.port");
-        String userName = conf.getString("eagleProps.eagle.service.username");
-        String pwd = conf.getString("eagleProps.eagle.service.password");
-        client = new EagleServiceClientImpl(host, port, userName, pwd);
-        int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
-        client.getJerseyClient().setReadTimeout(timeout * 1000);
-
-        return client;
-    }
-
-    private void doFlush(List entities) throws IOException, EagleServiceClientException {
-        client.create(entities);
-        int size = (entities == null ? 0 : entities.size());
-        LOG.info("finish flushing entities of total number " + size);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
deleted file mode 100644
index 02fc5cf..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-public class JHFSparkParser implements JHFParserBase {
-
-    private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class);
-
-    private boolean isValidJson;
-
-    private JHFSparkEventReader eventReader;
-
-    public JHFSparkParser(JHFSparkEventReader reader) {
-        this.eventReader = reader;
-    }
-
-    @Override
-    public void parse(InputStream is) throws Exception {
-        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
-            for (String line = reader.readLine(); line != null; line = reader.readLine()) {
-                isValidJson = true;
-                JSONObject eventObj = parseAndValidateJSON(line);
-                if (isValidJson) {
-                    try {
-                        this.eventReader.read(eventObj);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-
-            this.eventReader.clearReader();
-        }
-    }
-
-    private JSONObject parseAndValidateJSON(String line) {
-        JSONObject eventObj = null;
-        JSONParser parser = new JSONParser();
-        try {
-            eventObj = (JSONObject) parser.parse(line);
-        } catch (ParseException ex) {
-            isValidJson = false;
-            logger.error(String.format("Invalid json string. Fail to parse %s.", line), ex);
-        }
-        return eventObj;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
deleted file mode 100644
index 423d045..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-public class SparkApplicationInfo {
-
-    private String state;
-    private String finalStatus;
-    private String queue;
-    private String name;
-    private String user;
-
-    public String getState() {
-        return state;
-    }
-
-    public void setState(String state) {
-        this.state = state;
-    }
-
-    public String getFinalStatus() {
-        return finalStatus;
-    }
-
-    public void setFinalStatus(String finalStatus) {
-        this.finalStatus = finalStatus;
-    }
-
-    public String getQueue() {
-        return queue;
-    }
-
-    public void setQueue(String queue) {
-        this.queue = queue;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getUser() {
-        return user;
-    }
-
-    public void setUser(String user) {
-        this.user = user;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
deleted file mode 100644
index 3964454..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import org.apache.eagle.jpm.util.SparkJobTagName;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReader {
-
-    private String site;
-    private SparkApplicationInfo app;
-
-
-    public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app) {
-        this.site = site;
-        this.app = app;
-    }
-
-    @Override
-    public void read(InputStream is) throws Exception {
-        Map<String, String> baseTags = new HashMap<>();
-        baseTags.put(SparkJobTagName.SITE.toString(), site);
-        baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
-        JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app));
-        parser.parse(is);
-    }
-
-    public static void main(String[] args) throws Exception {
-        SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
-        impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1")));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
new file mode 100644
index 0000000..81f266b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.mr.runningentity.JobConfigSerDeser;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+    public JPMEntityRepository() {
+        entitySet.add(SparkAppEntity.class);
+        entitySet.add(SparkJobEntity.class);
+        entitySet.add(SparkStageEntity.class);
+        entitySet.add(SparkTaskEntity.class);
+        entitySet.add(SparkExecutorEntity.class);
+        serDeserMap.put(JobConfig.class, new JobConfigSerDeser());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
new file mode 100644
index 0000000..0d3a86f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
@@ -0,0 +1,26 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class JobConfig extends HashMap<String, String> implements Serializable {
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
new file mode 100644
index 0000000..51c8a50
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
@@ -0,0 +1,476 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningApps")
+@ColumnFamily("f")
+@Prefix("sparkApp")
+@Service(Constants.RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "user", "queue"})
+@Partition({"site"})
+public class SparkAppEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long  startTime;
+    @Column("b")
+    private long endTime;
+    @Column("c")
+    private String yarnState;
+    @Column("d")
+    private String yarnStatus;
+    @Column("e")
+    private JobConfig config;
+    @Column("f")
+    private int numJobs;
+    @Column("g")
+    private int totalStages;
+    @Column("h")
+    private int skippedStages;
+    @Column("i")
+    private int failedStages;
+    @Column("j")
+    private int totalTasks;
+    @Column("k")
+    private int skippedTasks;
+    @Column("l")
+    private int failedTasks;
+    @Column("m")
+    private int executors;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private long executorDeserializeTime;
+    @Column("w")
+    private long executorRunTime;
+    @Column("x")
+    private long resultSize;
+    @Column("y")
+    private long jvmGcTime;
+    @Column("z")
+    private long resultSerializationTime;
+    @Column("ab")
+    private long memoryBytesSpilled;
+    @Column("ac")
+    private long diskBytesSpilled;
+    @Column("ad")
+    private long execMemoryBytes;
+    @Column("ae")
+    private long driveMemoryBytes;
+    @Column("af")
+    private int completeTasks;
+    @Column("ag")
+    private long totalExecutorTime;
+    @Column("ah")
+    private long executorMemoryOverhead;
+    @Column("ai")
+    private long driverMemoryOverhead;
+    @Column("aj")
+    private int executorCores;
+    @Column("ak")
+    private int driverCores;
+    @Column("al")
+    private AppInfo appInfo;
+    @Column("am")
+    private int activeStages;
+    @Column("an")
+    private int completeStages;
+    @Column("ba")
+    private int activeTasks;
+
+    public int getActiveTasks() {
+        return activeTasks;
+    }
+
+    public void setActiveTasks(int activeTasks) {
+        this.activeTasks = activeTasks;
+        valueChanged("activeTasks");
+    }
+
+    public int getCompleteStages() {
+        return completeStages;
+    }
+
+    public void setCompleteStages(int completeStages) {
+        this.completeStages = completeStages;
+        valueChanged("completeStages");
+    }
+
+    public int getActiveStages() {
+        return activeStages;
+    }
+
+    public void setActiveStages(int activeStages) {
+        this.activeStages = activeStages;
+        valueChanged("activeStages");
+    }
+
+    public AppInfo getAppInfo() {
+        return appInfo;
+    }
+
+    public void setAppInfo(AppInfo appInfo) {
+        this.appInfo = appInfo;
+        valueChanged("appInfo");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public String getYarnState() {
+        return yarnState;
+    }
+
+    public String getYarnStatus() {
+        return yarnStatus;
+    }
+
+    public int getNumJobs() {
+        return numJobs;
+    }
+
+    public int getTotalStages() {
+        return totalStages;
+    }
+
+    public int getSkippedStages() {
+        return skippedStages;
+    }
+
+    public int getFailedStages() {
+        return failedStages;
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public int getSkippedTasks() {
+        return skippedTasks;
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public int getExecutors() {
+        return executors;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public long getDriveMemoryBytes() {
+        return driveMemoryBytes;
+    }
+
+    public int getCompleteTasks() {
+        return completeTasks;
+    }
+
+    public JobConfig getConfig() {
+        return config;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        valueChanged("endTime");
+    }
+
+    public void setYarnState(String yarnState) {
+        this.yarnState = yarnState;
+        valueChanged("yarnState");
+    }
+
+    public void setYarnStatus(String yarnStatus) {
+        this.yarnStatus = yarnStatus;
+        valueChanged("yarnStatus");
+    }
+
+    public void setConfig(JobConfig config) {
+        this.config = config;
+        valueChanged("config");
+    }
+
+    public void setNumJobs(int numJobs) {
+        this.numJobs = numJobs;
+        valueChanged("numJobs");
+    }
+
+    public void setTotalStages(int totalStages) {
+        this.totalStages = totalStages;
+        valueChanged("totalStages");
+    }
+
+    public void setSkippedStages(int skippedStages) {
+        this.skippedStages = skippedStages;
+        valueChanged("skippedStages");
+    }
+
+    public void setFailedStages(int failedStages) {
+        this.failedStages = failedStages;
+        valueChanged("failedStages");
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        valueChanged("totalTasks");
+    }
+
+    public void setSkippedTasks(int skippedTasks) {
+        this.skippedTasks = skippedTasks;
+        valueChanged("skippedTasks");
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        valueChanged("failedTasks");
+    }
+
+    public void setExecutors(int executors) {
+        this.executors = executors;
+        valueChanged("executors");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadBytes = shuffleReadRemoteBytes;
+        valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        valueChanged("shuffleWriteRecords");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        valueChanged("diskBytesSpilled");
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        valueChanged("execMemoryBytes");
+    }
+
+    public void setDriveMemoryBytes(long driveMemoryBytes) {
+        this.driveMemoryBytes = driveMemoryBytes;
+        valueChanged("driveMemoryBytes");
+    }
+
+    public void setCompleteTasks(int completeTasks) {
+        this.completeTasks = completeTasks;
+        valueChanged("completeTasks");
+    }
+
+    public long getTotalExecutorTime() {
+        return totalExecutorTime;
+    }
+
+    public void setTotalExecutorTime(long totalExecutorTime) {
+        this.totalExecutorTime = totalExecutorTime;
+        valueChanged("totalExecutorTime");
+    }
+
+    public long getExecutorMemoryOverhead() {
+        return executorMemoryOverhead;
+    }
+
+    public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
+        this.executorMemoryOverhead = executorMemoryOverhead;
+        valueChanged("executorMemoryOverhead");
+    }
+
+    public long getDriverMemoryOverhead() {
+        return driverMemoryOverhead;
+    }
+
+    public void setDriverMemoryOverhead(long driverMemoryOverhead) {
+        this.driverMemoryOverhead = driverMemoryOverhead;
+        valueChanged("driverMemoryOverhead");
+    }
+
+    public int getExecutorCores() {
+        return executorCores;
+    }
+
+    public void setExecutorCores(int executorCores) {
+        this.executorCores = executorCores;
+        valueChanged("executorCores");
+    }
+
+    public int getDriverCores() {
+        return driverCores;
+    }
+
+    public void setDriverCores(int driverCores) {
+        this.driverCores = driverCores;
+        valueChanged("driverCores");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
new file mode 100644
index 0000000..6d0441c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
@@ -0,0 +1,233 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningExecutors")
+@ColumnFamily("f")
+@Prefix("sparkExecutor")
+@Service(Constants.RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "executorId","user", "queue"})
+@Partition({"site"})
+public class SparkExecutorEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String hostPort;
+    @Column("b")
+    private int rddBlocks;
+    @Column("c")
+    private long memoryUsed;
+    @Column("d")
+    private long diskUsed;
+    @Column("e")
+    private int activeTasks = 0;
+    @Column("f")
+    private int failedTasks = 0;
+    @Column("g")
+    private int completedTasks = 0;
+    @Column("h")
+    private int totalTasks = 0;
+    @Column("i")
+    private long totalDuration = 0;
+    @Column("j")
+    private long totalInputBytes = 0;
+    @Column("k")
+    private long totalShuffleRead = 0;
+    @Column("l")
+    private long totalShuffleWrite = 0;
+    @Column("m")
+    private long maxMemory;
+    @Column("n")
+    private long startTime;
+    @Column("o")
+    private long endTime = 0;
+    @Column("p")
+    private long execMemoryBytes;
+    @Column("q")
+    private int cores;
+    @Column("r")
+    private long memoryOverhead;
+
+    public String getHostPort() {
+        return hostPort;
+    }
+
+    public void setHostPort(String hostPort) {
+        this.hostPort = hostPort;
+        this.valueChanged("hostPort");
+    }
+
+    public int getRddBlocks() {
+        return rddBlocks;
+    }
+
+    public void setRddBlocks(int rddBlocks) {
+        this.rddBlocks = rddBlocks;
+        this.valueChanged("rddBlocks");
+    }
+
+    public long getMemoryUsed() {
+        return memoryUsed;
+    }
+
+    public void setMemoryUsed(long memoryUsed) {
+        this.memoryUsed = memoryUsed;
+        this.valueChanged("memoryUsed");
+    }
+
+    public long getDiskUsed() {
+        return diskUsed;
+    }
+
+    public void setDiskUsed(long diskUsed) {
+        this.diskUsed = diskUsed;
+        this.valueChanged("diskUsed");
+    }
+
+    public int getActiveTasks() {
+        return activeTasks;
+    }
+
+    public void setActiveTasks(int activeTasks) {
+        this.activeTasks = activeTasks;
+        this.valueChanged("activeTasks");
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        this.valueChanged("failedTasks");
+    }
+
+    public int getCompletedTasks() {
+        return completedTasks;
+    }
+
+    public void setCompletedTasks(int completedTasks) {
+        this.completedTasks = completedTasks;
+        this.valueChanged("completedTasks");
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        this.valueChanged("totalTasks");
+    }
+
+    public long getTotalDuration() {
+        return totalDuration;
+    }
+
+    public void setTotalDuration(long totalDuration) {
+        this.totalDuration = totalDuration;
+        this.valueChanged("totalDuration");
+    }
+
+    public long getTotalInputBytes() {
+        return totalInputBytes;
+    }
+
+    public void setTotalInputBytes(long totalInputBytes) {
+        this.totalInputBytes = totalInputBytes;
+        this.valueChanged("totalInputBytes");
+    }
+
+    public long getTotalShuffleRead() {
+        return totalShuffleRead;
+    }
+
+    public void setTotalShuffleRead(long totalShuffleRead) {
+        this.totalShuffleRead = totalShuffleRead;
+        this.valueChanged("totalShuffleRead");
+    }
+
+    public long getTotalShuffleWrite() {
+        return totalShuffleWrite;
+    }
+
+    public void setTotalShuffleWrite(long totalShuffleWrite) {
+        this.totalShuffleWrite = totalShuffleWrite;
+        this.valueChanged("totalShuffleWrite");
+    }
+
+    public long getMaxMemory() {
+        return maxMemory;
+    }
+
+    public void setMaxMemory(long maxMemory) {
+        this.maxMemory = maxMemory;
+        this.valueChanged("maxMemory");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        this.valueChanged("endTime");
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        this.valueChanged("execMemoryBytes");
+    }
+
+    public int getCores() {
+        return cores;
+    }
+
+    public void setCores(int cores) {
+        this.cores = cores;
+        valueChanged("cores");
+    }
+
+    public long getMemoryOverhead() {
+        return memoryOverhead;
+    }
+
+    public void setMemoryOverhead(long memoryOverhead) {
+        this.memoryOverhead = memoryOverhead;
+        valueChanged("memoryOverhead");
+    }
+}