You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/09/07 02:36:40 UTC

[1/3] incubator-eagle git commit: Update spark history job feeder config & refactor the code

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 8774b85cd -> 3110c72e4


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
deleted file mode 100644
index 7de1530..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
+++ /dev/null
@@ -1,475 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-@Table("eagleSparkRunningApps")
-@ColumnFamily("f")
-@Prefix("sparkApp")
-@Service(Constants.RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "user", "queue"})
-@Partition({"site"})
-public class SparkAppEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private long  startTime;
-    @Column("b")
-    private long endTime;
-    @Column("c")
-    private String yarnState;
-    @Column("d")
-    private String yarnStatus;
-    @Column("e")
-    private JobConfig config;
-    @Column("f")
-    private int numJobs;
-    @Column("g")
-    private int totalStages;
-    @Column("h")
-    private int skippedStages;
-    @Column("i")
-    private int failedStages;
-    @Column("j")
-    private int totalTasks;
-    @Column("k")
-    private int skippedTasks;
-    @Column("l")
-    private int failedTasks;
-    @Column("m")
-    private int executors;
-    @Column("n")
-    private long inputBytes;
-    @Column("o")
-    private long inputRecords;
-    @Column("p")
-    private long outputBytes;
-    @Column("q")
-    private long outputRecords;
-    @Column("r")
-    private long shuffleReadBytes;
-    @Column("s")
-    private long shuffleReadRecords;
-    @Column("t")
-    private long shuffleWriteBytes;
-    @Column("u")
-    private long shuffleWriteRecords;
-    @Column("v")
-    private long executorDeserializeTime;
-    @Column("w")
-    private long executorRunTime;
-    @Column("x")
-    private long resultSize;
-    @Column("y")
-    private long jvmGcTime;
-    @Column("z")
-    private long resultSerializationTime;
-    @Column("ab")
-    private long memoryBytesSpilled;
-    @Column("ac")
-    private long diskBytesSpilled;
-    @Column("ad")
-    private long execMemoryBytes;
-    @Column("ae")
-    private long driveMemoryBytes;
-    @Column("af")
-    private int completeTasks;
-    @Column("ag")
-    private long totalExecutorTime;
-    @Column("ah")
-    private long executorMemoryOverhead;
-    @Column("ai")
-    private long driverMemoryOverhead;
-    @Column("aj")
-    private int executorCores;
-    @Column("ak")
-    private int driverCores;
-    @Column("al")
-    private AppInfo appInfo;
-    @Column("am")
-    private int activeStages;
-    @Column("an")
-    private int completeStages;
-    @Column("ba")
-    private int activeTasks;
-
-    public int getActiveTasks() {
-        return activeTasks;
-    }
-
-    public void setActiveTasks(int activeTasks) {
-        this.activeTasks = activeTasks;
-        valueChanged("activeTasks");
-    }
-
-    public int getCompleteStages() {
-        return completeStages;
-    }
-
-    public void setCompleteStages(int completeStages) {
-        this.completeStages = completeStages;
-        valueChanged("completeStages");
-    }
-
-    public int getActiveStages() {
-        return activeStages;
-    }
-
-    public void setActiveStages(int activeStages) {
-        this.activeStages = activeStages;
-        valueChanged("activeStages");
-    }
-
-    public AppInfo getAppInfo() {
-        return appInfo;
-    }
-
-    public void setAppInfo(AppInfo appInfo) {
-        this.appInfo = appInfo;
-        valueChanged("appInfo");
-    }
-
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public long getEndTime() {
-        return endTime;
-    }
-
-    public String getYarnState() {
-        return yarnState;
-    }
-
-    public String getYarnStatus() {
-        return yarnStatus;
-    }
-
-    public int getNumJobs() {
-        return numJobs;
-    }
-
-    public int getTotalStages() {
-        return totalStages;
-    }
-
-    public int getSkippedStages() {
-        return skippedStages;
-    }
-
-    public int getFailedStages() {
-        return failedStages;
-    }
-
-    public int getTotalTasks() {
-        return totalTasks;
-    }
-
-    public int getSkippedTasks() {
-        return skippedTasks;
-    }
-
-    public int getFailedTasks() {
-        return failedTasks;
-    }
-
-    public int getExecutors() {
-        return executors;
-    }
-
-    public long getInputBytes() {
-        return inputBytes;
-    }
-
-    public long getInputRecords() {
-        return inputRecords;
-    }
-
-    public long getOutputBytes() {
-        return outputBytes;
-    }
-
-    public long getOutputRecords() {
-        return outputRecords;
-    }
-
-    public long getShuffleReadBytes() {
-        return shuffleReadBytes;
-    }
-
-    public long getShuffleReadRecords() {
-        return shuffleReadRecords;
-    }
-
-    public long getShuffleWriteBytes() {
-        return shuffleWriteBytes;
-    }
-
-    public long getShuffleWriteRecords() {
-        return shuffleWriteRecords;
-    }
-
-    public long getExecutorDeserializeTime() {
-        return executorDeserializeTime;
-    }
-
-    public long getExecutorRunTime() {
-        return executorRunTime;
-    }
-
-    public long getResultSize() {
-        return resultSize;
-    }
-
-    public long getJvmGcTime() {
-        return jvmGcTime;
-    }
-
-    public long getResultSerializationTime() {
-        return resultSerializationTime;
-    }
-
-    public long getMemoryBytesSpilled() {
-        return memoryBytesSpilled;
-    }
-
-    public long getDiskBytesSpilled() {
-        return diskBytesSpilled;
-    }
-
-    public long getExecMemoryBytes() {
-        return execMemoryBytes;
-    }
-
-    public long getDriveMemoryBytes() {
-        return driveMemoryBytes;
-    }
-
-    public int getCompleteTasks() {
-        return completeTasks;
-    }
-
-    public JobConfig getConfig() {
-        return config;
-    }
-
-    public void setStartTime(long startTime) {
-        this.startTime = startTime;
-        valueChanged("startTime");
-    }
-
-    public void setEndTime(long endTime) {
-        this.endTime = endTime;
-        valueChanged("endTime");
-    }
-
-    public void setYarnState(String yarnState) {
-        this.yarnState = yarnState;
-        valueChanged("yarnState");
-    }
-
-    public void setYarnStatus(String yarnStatus) {
-        this.yarnStatus = yarnStatus;
-        valueChanged("yarnStatus");
-    }
-
-    public void setConfig(JobConfig config) {
-        this.config = config;
-        valueChanged("config");
-    }
-
-    public void setNumJobs(int numJobs) {
-        this.numJobs = numJobs;
-        valueChanged("numJobs");
-    }
-
-    public void setTotalStages(int totalStages) {
-        this.totalStages = totalStages;
-        valueChanged("totalStages");
-    }
-
-    public void setSkippedStages(int skippedStages) {
-        this.skippedStages = skippedStages;
-        valueChanged("skippedStages");
-    }
-
-    public void setFailedStages(int failedStages) {
-        this.failedStages = failedStages;
-        valueChanged("failedStages");
-    }
-
-    public void setTotalTasks(int totalTasks) {
-        this.totalTasks = totalTasks;
-        valueChanged("totalTasks");
-    }
-
-    public void setSkippedTasks(int skippedTasks) {
-        this.skippedTasks = skippedTasks;
-        valueChanged("skippedTasks");
-    }
-
-    public void setFailedTasks(int failedTasks) {
-        this.failedTasks = failedTasks;
-        valueChanged("failedTasks");
-    }
-
-    public void setExecutors(int executors) {
-        this.executors = executors;
-        valueChanged("executors");
-    }
-
-    public void setInputBytes(long inputBytes) {
-        this.inputBytes = inputBytes;
-        valueChanged("inputBytes");
-    }
-
-    public void setInputRecords(long inputRecords) {
-        this.inputRecords = inputRecords;
-        valueChanged("inputRecords");
-    }
-
-    public void setOutputBytes(long outputBytes) {
-        this.outputBytes = outputBytes;
-        valueChanged("outputBytes");
-    }
-
-    public void setOutputRecords(long outputRecords) {
-        this.outputRecords = outputRecords;
-        valueChanged("outputRecords");
-    }
-
-    public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
-        this.shuffleReadBytes = shuffleReadRemoteBytes;
-        valueChanged("shuffleReadBytes");
-    }
-
-    public void setShuffleReadRecords(long shuffleReadRecords) {
-        this.shuffleReadRecords = shuffleReadRecords;
-        valueChanged("shuffleReadRecords");
-    }
-
-    public void setShuffleWriteBytes(long shuffleWriteBytes) {
-        this.shuffleWriteBytes = shuffleWriteBytes;
-        valueChanged("shuffleWriteBytes");
-    }
-
-    public void setShuffleWriteRecords(long shuffleWriteRecords) {
-        this.shuffleWriteRecords = shuffleWriteRecords;
-        valueChanged("shuffleWriteRecords");
-    }
-
-    public void setExecutorDeserializeTime(long executorDeserializeTime) {
-        this.executorDeserializeTime = executorDeserializeTime;
-        valueChanged("executorDeserializeTime");
-    }
-
-    public void setExecutorRunTime(long executorRunTime) {
-        this.executorRunTime = executorRunTime;
-        valueChanged("executorRunTime");
-    }
-
-    public void setResultSize(long resultSize) {
-        this.resultSize = resultSize;
-        valueChanged("resultSize");
-    }
-
-    public void setJvmGcTime(long jvmGcTime) {
-        this.jvmGcTime = jvmGcTime;
-        valueChanged("jvmGcTime");
-    }
-
-    public void setResultSerializationTime(long resultSerializationTime) {
-        this.resultSerializationTime = resultSerializationTime;
-        valueChanged("resultSerializationTime");
-    }
-
-    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
-        this.memoryBytesSpilled = memoryBytesSpilled;
-        valueChanged("memoryBytesSpilled");
-    }
-
-    public void setDiskBytesSpilled(long diskBytesSpilled) {
-        this.diskBytesSpilled = diskBytesSpilled;
-        valueChanged("diskBytesSpilled");
-    }
-
-    public void setExecMemoryBytes(long execMemoryBytes) {
-        this.execMemoryBytes = execMemoryBytes;
-        valueChanged("execMemoryBytes");
-    }
-
-    public void setDriveMemoryBytes(long driveMemoryBytes) {
-        this.driveMemoryBytes = driveMemoryBytes;
-        valueChanged("driveMemoryBytes");
-    }
-
-    public void setCompleteTasks(int completeTasks) {
-        this.completeTasks = completeTasks;
-        valueChanged("completeTasks");
-    }
-
-    public long getTotalExecutorTime() {
-        return totalExecutorTime;
-    }
-
-    public void setTotalExecutorTime(long totalExecutorTime) {
-        this.totalExecutorTime = totalExecutorTime;
-        valueChanged("totalExecutorTime");
-    }
-
-    public long getExecutorMemoryOverhead() {
-        return executorMemoryOverhead;
-    }
-
-    public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
-        this.executorMemoryOverhead = executorMemoryOverhead;
-        valueChanged("executorMemoryOverhead");
-    }
-
-    public long getDriverMemoryOverhead() {
-        return driverMemoryOverhead;
-    }
-
-    public void setDriverMemoryOverhead(long driverMemoryOverhead) {
-        this.driverMemoryOverhead = driverMemoryOverhead;
-        valueChanged("driverMemoryOverhead");
-    }
-
-    public int getExecutorCores() {
-        return executorCores;
-    }
-
-    public void setExecutorCores(int executorCores) {
-        this.executorCores = executorCores;
-        valueChanged("executorCores");
-    }
-
-    public int getDriverCores() {
-        return driverCores;
-    }
-
-    public void setDriverCores(int driverCores) {
-        this.driverCores = driverCores;
-        valueChanged("driverCores");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
deleted file mode 100644
index 89549ca..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-@Table("eagleSparkRunningExecutors")
-@ColumnFamily("f")
-@Prefix("sparkExecutor")
-@Service(Constants.RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "executorId","user", "queue"})
-@Partition({"site"})
-public class SparkExecutorEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private String hostPort;
-    @Column("b")
-    private int rddBlocks;
-    @Column("c")
-    private long memoryUsed;
-    @Column("d")
-    private long diskUsed;
-    @Column("e")
-    private int activeTasks = 0;
-    @Column("f")
-    private int failedTasks = 0;
-    @Column("g")
-    private int completedTasks = 0;
-    @Column("h")
-    private int totalTasks = 0;
-    @Column("i")
-    private long totalDuration = 0;
-    @Column("j")
-    private long totalInputBytes = 0;
-    @Column("k")
-    private long totalShuffleRead = 0;
-    @Column("l")
-    private long totalShuffleWrite = 0;
-    @Column("m")
-    private long maxMemory;
-    @Column("n")
-    private long startTime;
-    @Column("o")
-    private long endTime = 0;
-    @Column("p")
-    private long execMemoryBytes;
-    @Column("q")
-    private int cores;
-    @Column("r")
-    private long memoryOverhead;
-
-    public String getHostPort() {
-        return hostPort;
-    }
-
-    public void setHostPort(String hostPort) {
-        this.hostPort = hostPort;
-        this.valueChanged("hostPort");
-    }
-
-    public int getRddBlocks() {
-        return rddBlocks;
-    }
-
-    public void setRddBlocks(int rddBlocks) {
-        this.rddBlocks = rddBlocks;
-        this.valueChanged("rddBlocks");
-    }
-
-    public long getMemoryUsed() {
-        return memoryUsed;
-    }
-
-    public void setMemoryUsed(long memoryUsed) {
-        this.memoryUsed = memoryUsed;
-        this.valueChanged("memoryUsed");
-    }
-
-    public long getDiskUsed() {
-        return diskUsed;
-    }
-
-    public void setDiskUsed(long diskUsed) {
-        this.diskUsed = diskUsed;
-        this.valueChanged("diskUsed");
-    }
-
-    public int getActiveTasks() {
-        return activeTasks;
-    }
-
-    public void setActiveTasks(int activeTasks) {
-        this.activeTasks = activeTasks;
-        this.valueChanged("activeTasks");
-    }
-
-    public int getFailedTasks() {
-        return failedTasks;
-    }
-
-    public void setFailedTasks(int failedTasks) {
-        this.failedTasks = failedTasks;
-        this.valueChanged("failedTasks");
-    }
-
-    public int getCompletedTasks() {
-        return completedTasks;
-    }
-
-    public void setCompletedTasks(int completedTasks) {
-        this.completedTasks = completedTasks;
-        this.valueChanged("completedTasks");
-    }
-
-    public int getTotalTasks() {
-        return totalTasks;
-    }
-
-    public void setTotalTasks(int totalTasks) {
-        this.totalTasks = totalTasks;
-        this.valueChanged("totalTasks");
-    }
-
-    public long getTotalDuration() {
-        return totalDuration;
-    }
-
-    public void setTotalDuration(long totalDuration) {
-        this.totalDuration = totalDuration;
-        this.valueChanged("totalDuration");
-    }
-
-    public long getTotalInputBytes() {
-        return totalInputBytes;
-    }
-
-    public void setTotalInputBytes(long totalInputBytes) {
-        this.totalInputBytes = totalInputBytes;
-        this.valueChanged("totalInputBytes");
-    }
-
-    public long getTotalShuffleRead() {
-        return totalShuffleRead;
-    }
-
-    public void setTotalShuffleRead(long totalShuffleRead) {
-        this.totalShuffleRead = totalShuffleRead;
-        this.valueChanged("totalShuffleRead");
-    }
-
-    public long getTotalShuffleWrite() {
-        return totalShuffleWrite;
-    }
-
-    public void setTotalShuffleWrite(long totalShuffleWrite) {
-        this.totalShuffleWrite = totalShuffleWrite;
-        this.valueChanged("totalShuffleWrite");
-    }
-
-    public long getMaxMemory() {
-        return maxMemory;
-    }
-
-    public void setMaxMemory(long maxMemory) {
-        this.maxMemory = maxMemory;
-        this.valueChanged("maxMemory");
-    }
-
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(long startTime) {
-        this.startTime = startTime;
-        valueChanged("startTime");
-    }
-
-    public long getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(long endTime) {
-        this.endTime = endTime;
-        this.valueChanged("endTime");
-    }
-
-    public long getExecMemoryBytes() {
-        return execMemoryBytes;
-    }
-
-    public void setExecMemoryBytes(long execMemoryBytes) {
-        this.execMemoryBytes = execMemoryBytes;
-        this.valueChanged("execMemoryBytes");
-    }
-
-    public int getCores() {
-        return cores;
-    }
-
-    public void setCores(int cores) {
-        this.cores = cores;
-        valueChanged("cores");
-    }
-
-    public long getMemoryOverhead() {
-        return memoryOverhead;
-    }
-
-    public void setMemoryOverhead(long memoryOverhead) {
-        this.memoryOverhead = memoryOverhead;
-        valueChanged("memoryOverhead");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
deleted file mode 100644
index bb56b52..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-import java.util.List;
-
-@Table("eagleSparkRunningJobs")
-@ColumnFamily("f")
-@Prefix("sparkJob")
-@Service(Constants.RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId","user", "queue"})
-@Partition({"site"})
-public class SparkJobEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private long  submissionTime;
-    @Column("b")
-    private long completionTime;
-    @Column("c")
-    private int numStages = 0;
-    @Column("d")
-    private String status;
-    @Column("e")
-    private int numTask = 0;
-    @Column("f")
-    private int numActiveTasks = 0;
-    @Column("g")
-    private int numCompletedTasks = 0;
-    @Column("h")
-    private int numSkippedTasks = 0;
-    @Column("i")
-    private int numFailedTasks = 0;
-    @Column("j")
-    private int numActiveStages = 0;
-    @Column("k")
-    private int numCompletedStages = 0;
-    @Column("l")
-    private int numSkippedStages = 0;
-    @Column("m")
-    private int numFailedStages = 0;
-    @Column("n")
-    private List<Integer> stages;
-
-    public List<Integer> getStages() {
-        return stages;
-    }
-
-    public void setStages(List<Integer> stages) {
-        this.stages = stages;
-        this.valueChanged("stages");
-    }
-
-    public long getSubmissionTime() {
-        return submissionTime;
-    }
-
-    public long getCompletionTime() {
-        return completionTime;
-    }
-
-    public int getNumStages() {
-        return numStages;
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public int getNumTask() {
-        return numTask;
-    }
-
-    public int getNumActiveTasks() {
-        return numActiveTasks;
-    }
-
-    public int getNumCompletedTasks() {
-        return numCompletedTasks;
-    }
-
-    public int getNumSkippedTasks() {
-        return numSkippedTasks;
-    }
-
-    public int getNumFailedTasks() {
-        return numFailedTasks;
-    }
-
-    public int getNumActiveStages() {
-        return numActiveStages;
-    }
-
-    public int getNumCompletedStages() {
-        return numCompletedStages;
-    }
-
-    public int getNumSkippedStages() {
-        return numSkippedStages;
-    }
-
-    public int getNumFailedStages() {
-        return numFailedStages;
-    }
-
-    public void setSubmissionTime(long submissionTime) {
-        this.submissionTime = submissionTime;
-        this.valueChanged("submissionTime");
-    }
-
-    public void setCompletionTime(long completionTime) {
-        this.completionTime = completionTime;
-        this.valueChanged("completionTime");
-    }
-
-    public void setNumStages(int numStages) {
-        this.numStages = numStages;
-        this.valueChanged("numStages");
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-        this.valueChanged("status");
-    }
-
-    public void setNumTask(int numTask) {
-        this.numTask = numTask;
-        this.valueChanged("numTask");
-    }
-
-    public void setNumActiveTasks(int numActiveTasks) {
-        this.numActiveTasks = numActiveTasks;
-        this.valueChanged("numActiveTasks");
-    }
-
-    public void setNumCompletedTasks(int numCompletedTasks) {
-        this.numCompletedTasks = numCompletedTasks;
-        this.valueChanged("numCompletedTasks");
-    }
-
-    public void setNumSkippedTasks(int numSkippedTasks) {
-        this.numSkippedTasks = numSkippedTasks;
-        this.valueChanged("numSkippedTasks");
-    }
-
-    public void setNumFailedTasks(int numFailedTasks) {
-        this.numFailedTasks = numFailedTasks;
-        this.valueChanged("numFailedTasks");
-    }
-
-    public void setNumActiveStages(int numActiveStages) {
-        this.numActiveStages = numActiveStages;
-        this.valueChanged("numActiveStages");
-    }
-
-    public void setNumCompletedStages(int numCompletedStages) {
-        this.numCompletedStages = numCompletedStages;
-        this.valueChanged("numCompletedStages");
-    }
-
-    public void setNumSkippedStages(int numSkippedStages) {
-        this.numSkippedStages = numSkippedStages;
-        this.valueChanged("numSkippedStages");
-    }
-
-    public void setNumFailedStages(int numFailedStages) {
-        this.numFailedStages = numFailedStages;
-        this.valueChanged("numFailedStages");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
deleted file mode 100644
index be0ffd0..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-@Table("eagleSparkRunningStages")
-@ColumnFamily("f")
-@Prefix("sparkStage")
-@Service(Constants.RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
-@Partition({"site"})
-public class SparkStageEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private String status;
-    @Column("b")
-    private int numActiveTasks = 0;
-    @Column("c")
-    private int numCompletedTasks = 0;
-    @Column("d")
-    private int numFailedTasks = 0;
-    @Column("e")
-    private long executorRunTime = 0L;
-    @Column("f")
-    private long inputBytes = 0L;
-    @Column("g")
-    private long inputRecords = 0L;
-    @Column("h")
-    private long outputBytes = 0L;
-    @Column("i")
-    private long outputRecords = 0L;
-    @Column("j")
-    private long shuffleReadBytes = 0L;
-    @Column("k")
-    private long shuffleReadRecords = 0L;
-    @Column("l")
-    private long shuffleWriteBytes = 0L;
-    @Column("m")
-    private long shuffleWriteRecords = 0L;
-    @Column("n")
-    private long memoryBytesSpilled = 0L;
-    @Column("o")
-    private long diskBytesSpilled = 0L;
-    @Column("p")
-    private String name;
-    @Column("q")
-    private String schedulingPool;
-    @Column("r")
-    private long submitTime;
-    @Column("s")
-    private long completeTime;
-    @Column("t")
-    private int numTasks;
-    @Column("u")
-    private long executorDeserializeTime;
-    @Column("v")
-    private long resultSize;
-    @Column("w")
-    private long jvmGcTime;
-    @Column("x")
-    private long resultSerializationTime;
-
-    public String getStatus() {
-        return status;
-    }
-
-    public int getNumActiveTasks() {
-        return numActiveTasks;
-    }
-
-    public int getNumCompletedTasks() {
-        return numCompletedTasks;
-    }
-
-    public int getNumFailedTasks() {
-        return numFailedTasks;
-    }
-
-    public long getExecutorRunTime() {
-        return executorRunTime;
-    }
-
-    public long getInputBytes() {
-        return inputBytes;
-    }
-
-    public long getInputRecords() {
-        return inputRecords;
-    }
-
-    public long getOutputBytes() {
-        return outputBytes;
-    }
-
-    public long getOutputRecords() {
-        return outputRecords;
-    }
-
-    public long getShuffleReadBytes() {
-        return shuffleReadBytes;
-    }
-
-    public long getShuffleReadRecords() {
-        return shuffleReadRecords;
-    }
-
-    public long getShuffleWriteBytes() {
-        return shuffleWriteBytes;
-    }
-
-    public long getShuffleWriteRecords() {
-        return shuffleWriteRecords;
-    }
-
-    public long getMemoryBytesSpilled() {
-        return memoryBytesSpilled;
-    }
-
-    public long getDiskBytesSpilled() {
-        return diskBytesSpilled;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public String getSchedulingPool() {
-        return schedulingPool;
-    }
-
-    public long getSubmitTime() {
-        return submitTime;
-    }
-
-    public long getCompleteTime() {
-        return completeTime;
-    }
-
-    public int getNumTasks() {
-        return numTasks;
-    }
-
-    public long getExecutorDeserializeTime() {
-        return executorDeserializeTime;
-    }
-
-    public long getResultSize() {
-        return resultSize;
-    }
-
-    public long getJvmGcTime() {
-        return jvmGcTime;
-    }
-
-    public long getResultSerializationTime() {
-        return resultSerializationTime;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-        this.valueChanged("status");
-    }
-
-    public void setNumActiveTasks(int numActiveTasks) {
-        this.numActiveTasks = numActiveTasks;
-        this.valueChanged("numActiveTasks");
-    }
-
-    public void setNumCompletedTasks(int numCompletedTasks) {
-        this.numCompletedTasks = numCompletedTasks;
-        this.valueChanged("numCompletedTasks");
-    }
-
-    public void setNumFailedTasks(int numFailedTasks) {
-        this.numFailedTasks = numFailedTasks;
-        this.valueChanged("numFailedTasks");
-    }
-
-    public void setExecutorRunTime(long executorRunTime) {
-        this.executorRunTime = executorRunTime;
-        this.valueChanged("executorRunTime");
-    }
-
-    public void setInputBytes(long inputBytes) {
-        this.inputBytes = inputBytes;
-        this.valueChanged("inputBytes");
-    }
-
-    public void setInputRecords(long inputRecords) {
-        this.inputRecords = inputRecords;
-        this.valueChanged("inputRecords");
-    }
-
-    public void setOutputBytes(long outputBytes) {
-        this.outputBytes = outputBytes;
-        this.valueChanged("outputBytes");
-    }
-
-    public void setOutputRecords(long outputRecords) {
-        this.outputRecords = outputRecords;
-        this.valueChanged("outputRecords");
-    }
-
-    public void setShuffleReadBytes(long shuffleReadBytes) {
-        this.shuffleReadBytes = shuffleReadBytes;
-        this.valueChanged("shuffleReadBytes");
-    }
-
-    public void setShuffleReadRecords(long shuffleReadRecords) {
-        this.shuffleReadRecords = shuffleReadRecords;
-        this.valueChanged("shuffleReadRecords");
-    }
-
-    public void setShuffleWriteBytes(long shuffleWriteBytes) {
-        this.shuffleWriteBytes = shuffleWriteBytes;
-        this.valueChanged("shuffleWriteBytes");
-    }
-
-    public void setShuffleWriteRecords(long shuffleWriteRecords) {
-        this.shuffleWriteRecords = shuffleWriteRecords;
-        this.valueChanged("shuffleWriteRecords");
-    }
-
-    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
-        this.memoryBytesSpilled = memoryBytesSpilled;
-        this.valueChanged("memoryBytesSpilled");
-    }
-
-    public void setDiskBytesSpilled(long diskBytesSpilled) {
-        this.diskBytesSpilled = diskBytesSpilled;
-        this.valueChanged("diskBytesSpilled");
-    }
-
-    public void setName(String name) {
-        this.name = name;
-        this.valueChanged("name");
-    }
-
-    public void setSchedulingPool(String schedulingPool) {
-        this.schedulingPool = schedulingPool;
-        this.valueChanged("schedulingPool");
-    }
-
-    public void setSubmitTime(long submitTime) {
-        this.submitTime = submitTime;
-        this.valueChanged("submitTime");
-    }
-
-    public void setCompleteTime(long completeTime) {
-        this.completeTime = completeTime;
-        this.valueChanged("completeTime");
-    }
-
-    public void setNumTasks(int numTasks) {
-        this.numTasks = numTasks;
-        valueChanged("numTasks");
-    }
-
-    public void setExecutorDeserializeTime(long executorDeserializeTime) {
-        this.executorDeserializeTime = executorDeserializeTime;
-        valueChanged("executorDeserializeTime");
-    }
-
-    public void setResultSize(long resultSize) {
-        this.resultSize = resultSize;
-        valueChanged("resultSize");
-    }
-
-    public void setJvmGcTime(long jvmGcTime) {
-        this.jvmGcTime = jvmGcTime;
-        valueChanged("jvmGcTime");
-    }
-
-    public void setResultSerializationTime(long resultSerializationTime) {
-        this.resultSerializationTime = resultSerializationTime;
-        valueChanged("resultSerializationTime");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
deleted file mode 100644
index e531806..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-
-@Table("eagleSparkRunningTasks")
-@ColumnFamily("f")
-@Prefix("sparkTask")
-@Service(Constants.RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(true)
-@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
-@Partition({"site"})
-public class SparkTaskEntity extends TaggedLogAPIEntity {
-    @Column("a")
-    private int taskId;
-    @Column("b")
-    private long launchTime;
-    @Column("c")
-    private String executorId;
-    @Column("d")
-    private String host;
-    @Column("e")
-    private String taskLocality;
-    @Column("f")
-    private boolean speculative;
-    @Column("g")
-    private long executorDeserializeTime;
-    @Column("h")
-    private long executorRunTime;
-    @Column("i")
-    private long resultSize;
-    @Column("j")
-    private long jvmGcTime;
-    @Column("k")
-    private long resultSerializationTime;
-    @Column("l")
-    private long memoryBytesSpilled;
-    @Column("m")
-    private long diskBytesSpilled;
-    @Column("n")
-    private long inputBytes;
-    @Column("o")
-    private long inputRecords;
-    @Column("p")
-    private long outputBytes;
-    @Column("q")
-    private long outputRecords;
-    @Column("r")
-    private long shuffleReadRemoteBytes;
-    @Column("x")
-    private long shuffleReadLocalBytes;
-    @Column("s")
-    private long shuffleReadRecords;
-    @Column("t")
-    private long shuffleWriteBytes;
-    @Column("u")
-    private long shuffleWriteRecords;
-    @Column("v")
-    private boolean failed;
-
-    public int getTaskId() {
-        return taskId;
-    }
-
-    public long getLaunchTime() {
-        return launchTime;
-    }
-
-    public String getExecutorId() {
-        return executorId;
-    }
-
-    public String getHost() {
-        return host;
-    }
-
-    public String getTaskLocality() {
-        return taskLocality;
-    }
-
-    public boolean isSpeculative() {
-        return speculative;
-    }
-
-    public long getExecutorDeserializeTime() {
-        return executorDeserializeTime;
-    }
-
-    public long getExecutorRunTime() {
-        return executorRunTime;
-    }
-
-    public long getResultSize() {
-        return resultSize;
-    }
-
-    public long getJvmGcTime() {
-        return jvmGcTime;
-    }
-
-    public long getResultSerializationTime() {
-        return resultSerializationTime;
-    }
-
-    public long getMemoryBytesSpilled() {
-        return memoryBytesSpilled;
-    }
-
-    public long getDiskBytesSpilled() {
-        return diskBytesSpilled;
-    }
-
-    public long getInputBytes() {
-        return inputBytes;
-    }
-
-    public long getInputRecords() {
-        return inputRecords;
-    }
-
-    public long getOutputBytes() {
-        return outputBytes;
-    }
-
-    public long getOutputRecords() {
-        return outputRecords;
-    }
-
-    public long getShuffleReadRecords() {
-        return shuffleReadRecords;
-    }
-
-    public long getShuffleWriteBytes() {
-        return shuffleWriteBytes;
-    }
-
-    public long getShuffleWriteRecords() {
-        return shuffleWriteRecords;
-    }
-
-    public boolean isFailed() {
-        return failed;
-    }
-
-    public long getShuffleReadRemoteBytes() {
-        return shuffleReadRemoteBytes;
-    }
-
-    public long getShuffleReadLocalBytes() {
-        return shuffleReadLocalBytes;
-    }
-
-    public void setFailed(boolean failed) {
-        this.failed = failed;
-        valueChanged("failed");
-    }
-
-    public void setTaskId(int taskId) {
-        this.taskId = taskId;
-        valueChanged("taskId");
-    }
-
-    public void setLaunchTime(long launchTime) {
-        this.launchTime = launchTime;
-        valueChanged("launchTime");
-    }
-
-    public void setExecutorId(String executorId) {
-        this.executorId = executorId;
-        valueChanged("executorId");
-    }
-
-    public void setHost(String host) {
-        this.host = host;
-        this.valueChanged("host");
-    }
-
-    public void setTaskLocality(String taskLocality) {
-        this.taskLocality = taskLocality;
-        this.valueChanged("taskLocality");
-    }
-
-    public void setSpeculative(boolean speculative) {
-        this.speculative = speculative;
-        this.valueChanged("speculative");
-    }
-
-    public void setExecutorDeserializeTime(long executorDeserializeTime) {
-        this.executorDeserializeTime = executorDeserializeTime;
-        this.valueChanged("executorDeserializeTime");
-    }
-
-    public void setExecutorRunTime(long executorRunTime) {
-        this.executorRunTime = executorRunTime;
-        this.valueChanged("executorRunTime");
-    }
-
-    public void setResultSize(long resultSize) {
-        this.resultSize = resultSize;
-        this.valueChanged("resultSize");
-    }
-
-    public void setJvmGcTime(long jvmGcTime) {
-        this.jvmGcTime = jvmGcTime;
-        this.valueChanged("jvmGcTime");
-    }
-
-    public void setResultSerializationTime(long resultSerializationTime) {
-        this.resultSerializationTime = resultSerializationTime;
-        this.valueChanged("resultSerializationTime");
-    }
-
-    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
-        this.memoryBytesSpilled = memoryBytesSpilled;
-        this.valueChanged("memoryBytesSpilled");
-    }
-
-    public void setDiskBytesSpilled(long diskBytesSpilled) {
-        this.diskBytesSpilled = diskBytesSpilled;
-        this.valueChanged("diskBytesSpilled");
-    }
-
-    public void setInputBytes(long inputBytes) {
-        this.inputBytes = inputBytes;
-        this.valueChanged("inputBytes");
-    }
-
-    public void setInputRecords(long inputRecords) {
-        this.inputRecords = inputRecords;
-        this.valueChanged("inputRecords");
-    }
-
-    public void setOutputBytes(long outputBytes) {
-        this.outputBytes = outputBytes;
-        this.valueChanged("outputBytes");
-    }
-
-    public void setOutputRecords(long outputRecords) {
-        this.outputRecords = outputRecords;
-        this.valueChanged("outputRecords");
-    }
-
-
-
-    public void setShuffleReadRecords(long shuffleReadRecords) {
-        this.shuffleReadRecords = shuffleReadRecords;
-        this.valueChanged("shuffleReadRecords");
-    }
-
-    public void setShuffleWriteBytes(long shuffleWriteBytes) {
-        this.shuffleWriteBytes = shuffleWriteBytes;
-        this.valueChanged("shuffleWriteBytes");
-    }
-
-    public void setShuffleWriteRecords(long shuffleWriteRecords) {
-        this.shuffleWriteRecords = shuffleWriteRecords;
-        this.valueChanged("shuffleWriteRecords");
-    }
-
-    public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
-        this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
-        this.valueChanged("shuffleReadRemoteBytes");
-    }
-
-    public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
-        this.shuffleReadLocalBytes = shuffleReadLocalBytes;
-        this.valueChanged("shuffleReadLocalBytes");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
index 6411018..3719325 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
@@ -18,14 +18,10 @@
 
 package org.apache.eagle.jpm.spark.running.parser;
 
-import org.apache.eagle.jpm.spark.crawl.EventType;
 import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.jpm.spark.running.entities.*;
 import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.util.SparkJobTagName;
-import org.apache.eagle.jpm.util.Utils;
+import org.apache.eagle.jpm.util.*;
 import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
 import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
 import org.apache.eagle.jpm.util.resourcefetch.model.*;
@@ -219,7 +215,7 @@ public class SparkApplicationParser implements Runnable {
                     if (eventObj != null) {
                         String eventType = (String) eventObj.get("Event");
                         LOG.info("Event type: " + eventType);
-                        if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
+                        if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerEnvironmentUpdate.toString())) {
                             stop = true;
                             JSONObject sparkProps = (JSONObject) eventObj.get("Spark Properties");
                             for (Object key : sparkProps.keySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
index 9d9f622..4d07b38 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
@@ -14,6 +14,9 @@
 # limitations under the License.
 
 {
+  "appId":"sparkRunningJob",
+  "mode":"LOCAL",
+  "workers" : 3,
   "envContextConfig" : {
     "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
@@ -24,7 +27,6 @@
       "sparkRunningJobFetchSpout" : 1,
       "sparkRunningJobParseBolt" : 4
     },
-    "workers" : 2
   },
   "jobExtractorConfig" : {
     "site" : "sandbox",
@@ -48,8 +50,6 @@
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000
   },
-  "appId":"sparkRunningJob",
-  "mode":"LOCAL",
   "eagleProps" : {
     "mailHost" : "abc.com",
     "mailDebug" : "true",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEventType.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEventType.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEventType.java
new file mode 100644
index 0000000..7c9f625
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEventType.java
@@ -0,0 +1,25 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public enum SparkEventType {
+    SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart,
+    SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved,
+    SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
index b1881ef..61b2fee 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
@@ -107,7 +107,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
 
     private String getMRFinishedJobURL(String lastFinishedTime) {
         String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl());
-        return url + "/" + "Constants.V2_APPS_URL"
+        return url + "/" + Constants.V2_APPS_URL
                 + "?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin="
                 + lastFinishedTime + "&" + Constants.ANONYMOUS_PARAMETER;
     }


[2/3] incubator-eagle git commit: Update spark history job feeder config & refactor the code

Posted by qi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
new file mode 100644
index 0000000..211d6b7
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
@@ -0,0 +1,191 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.util.List;
+
+@Table("eagleSparkRunningJobs")
+@ColumnFamily("f")
+@Prefix("sparkJob")
+@Service(Constants.RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId","user", "queue"})
+@Partition({"site"})
+public class SparkJobEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long  submissionTime;
+    @Column("b")
+    private long completionTime;
+    @Column("c")
+    private int numStages = 0;
+    @Column("d")
+    private String status;
+    @Column("e")
+    private int numTask = 0;
+    @Column("f")
+    private int numActiveTasks = 0;
+    @Column("g")
+    private int numCompletedTasks = 0;
+    @Column("h")
+    private int numSkippedTasks = 0;
+    @Column("i")
+    private int numFailedTasks = 0;
+    @Column("j")
+    private int numActiveStages = 0;
+    @Column("k")
+    private int numCompletedStages = 0;
+    @Column("l")
+    private int numSkippedStages = 0;
+    @Column("m")
+    private int numFailedStages = 0;
+    @Column("n")
+    private List<Integer> stages;
+
+    public List<Integer> getStages() {
+        return stages;
+    }
+
+    public void setStages(List<Integer> stages) {
+        this.stages = stages;
+        this.valueChanged("stages");
+    }
+
+    public long getSubmissionTime() {
+        return submissionTime;
+    }
+
+    public long getCompletionTime() {
+        return completionTime;
+    }
+
+    public int getNumStages() {
+        return numStages;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumTask() {
+        return numTask;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumSkippedTasks() {
+        return numSkippedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public int getNumActiveStages() {
+        return numActiveStages;
+    }
+
+    public int getNumCompletedStages() {
+        return numCompletedStages;
+    }
+
+    public int getNumSkippedStages() {
+        return numSkippedStages;
+    }
+
+    public int getNumFailedStages() {
+        return numFailedStages;
+    }
+
+    public void setSubmissionTime(long submissionTime) {
+        this.submissionTime = submissionTime;
+        this.valueChanged("submissionTime");
+    }
+
+    public void setCompletionTime(long completionTime) {
+        this.completionTime = completionTime;
+        this.valueChanged("completionTime");
+    }
+
+    public void setNumStages(int numStages) {
+        this.numStages = numStages;
+        this.valueChanged("numStages");
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumTask(int numTask) {
+        this.numTask = numTask;
+        this.valueChanged("numTask");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumSkippedTasks(int numSkippedTasks) {
+        this.numSkippedTasks = numSkippedTasks;
+        this.valueChanged("numSkippedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setNumActiveStages(int numActiveStages) {
+        this.numActiveStages = numActiveStages;
+        this.valueChanged("numActiveStages");
+    }
+
+    public void setNumCompletedStages(int numCompletedStages) {
+        this.numCompletedStages = numCompletedStages;
+        this.valueChanged("numCompletedStages");
+    }
+
+    public void setNumSkippedStages(int numSkippedStages) {
+        this.numSkippedStages = numSkippedStages;
+        this.valueChanged("numSkippedStages");
+    }
+
+    public void setNumFailedStages(int numFailedStages) {
+        this.numFailedStages = numFailedStages;
+        this.valueChanged("numFailedStages");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
new file mode 100644
index 0000000..0194132
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
@@ -0,0 +1,299 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningStages")
+@ColumnFamily("f")
+@Prefix("sparkStage")
+@Service(Constants.RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "stageId","stageAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkStageEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String status;
+    @Column("b")
+    private int numActiveTasks = 0;
+    @Column("c")
+    private int numCompletedTasks = 0;
+    @Column("d")
+    private int numFailedTasks = 0;
+    @Column("e")
+    private long executorRunTime = 0L;
+    @Column("f")
+    private long inputBytes = 0L;
+    @Column("g")
+    private long inputRecords = 0L;
+    @Column("h")
+    private long outputBytes = 0L;
+    @Column("i")
+    private long outputRecords = 0L;
+    @Column("j")
+    private long shuffleReadBytes = 0L;
+    @Column("k")
+    private long shuffleReadRecords = 0L;
+    @Column("l")
+    private long shuffleWriteBytes = 0L;
+    @Column("m")
+    private long shuffleWriteRecords = 0L;
+    @Column("n")
+    private long memoryBytesSpilled = 0L;
+    @Column("o")
+    private long diskBytesSpilled = 0L;
+    @Column("p")
+    private String name;
+    @Column("q")
+    private String schedulingPool;
+    @Column("r")
+    private long submitTime;
+    @Column("s")
+    private long completeTime;
+    @Column("t")
+    private int numTasks;
+    @Column("u")
+    private long executorDeserializeTime;
+    @Column("v")
+    private long resultSize;
+    @Column("w")
+    private long jvmGcTime;
+    @Column("x")
+    private long resultSerializationTime;
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getSchedulingPool() {
+        return schedulingPool;
+    }
+
+    public long getSubmitTime() {
+        return submitTime;
+    }
+
+    public long getCompleteTime() {
+        return completeTime;
+    }
+
+    public int getNumTasks() {
+        return numTasks;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadBytes) {
+        this.shuffleReadBytes = shuffleReadBytes;
+        this.valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setName(String name) {
+        this.name = name;
+        this.valueChanged("name");
+    }
+
+    public void setSchedulingPool(String schedulingPool) {
+        this.schedulingPool = schedulingPool;
+        this.valueChanged("schedulingPool");
+    }
+
+    public void setSubmitTime(long submitTime) {
+        this.submitTime = submitTime;
+        this.valueChanged("submitTime");
+    }
+
+    public void setCompleteTime(long completeTime) {
+        this.completeTime = completeTime;
+        this.valueChanged("completeTime");
+    }
+
+    public void setNumTasks(int numTasks) {
+        this.numTasks = numTasks;
+        valueChanged("numTasks");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
new file mode 100644
index 0000000..6522c3c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
@@ -0,0 +1,290 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningTasks")
+@ColumnFamily("f")
+@Prefix("sparkTask")
+@Service(Constants.RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", "jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkTaskEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private int taskId;
+    @Column("b")
+    private long launchTime;
+    @Column("c")
+    private String executorId;
+    @Column("d")
+    private String host;
+    @Column("e")
+    private String taskLocality;
+    @Column("f")
+    private boolean speculative;
+    @Column("g")
+    private long executorDeserializeTime;
+    @Column("h")
+    private long executorRunTime;
+    @Column("i")
+    private long resultSize;
+    @Column("j")
+    private long jvmGcTime;
+    @Column("k")
+    private long resultSerializationTime;
+    @Column("l")
+    private long memoryBytesSpilled;
+    @Column("m")
+    private long diskBytesSpilled;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadRemoteBytes;
+    @Column("x")
+    private long shuffleReadLocalBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private boolean failed;
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public long getLaunchTime() {
+        return launchTime;
+    }
+
+    public String getExecutorId() {
+        return executorId;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public String getTaskLocality() {
+        return taskLocality;
+    }
+
+    public boolean isSpeculative() {
+        return speculative;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public boolean isFailed() {
+        return failed;
+    }
+
+    public long getShuffleReadRemoteBytes() {
+        return shuffleReadRemoteBytes;
+    }
+
+    public long getShuffleReadLocalBytes() {
+        return shuffleReadLocalBytes;
+    }
+
+    public void setFailed(boolean failed) {
+        this.failed = failed;
+        valueChanged("failed");
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+        valueChanged("taskId");
+    }
+
+    public void setLaunchTime(long launchTime) {
+        this.launchTime = launchTime;
+        valueChanged("launchTime");
+    }
+
+    public void setExecutorId(String executorId) {
+        this.executorId = executorId;
+        valueChanged("executorId");
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+        this.valueChanged("host");
+    }
+
+    public void setTaskLocality(String taskLocality) {
+        this.taskLocality = taskLocality;
+        this.valueChanged("taskLocality");
+    }
+
+    public void setSpeculative(boolean speculative) {
+        this.speculative = speculative;
+        this.valueChanged("speculative");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        this.valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        this.valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        this.valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        this.valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
+        this.valueChanged("shuffleReadRemoteBytes");
+    }
+
+    public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
+        this.shuffleReadLocalBytes = shuffleReadLocalBytes;
+        this.valueChanged("shuffleReadLocalBytes");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 0fc74d7..284eeee 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -82,8 +82,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
         this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
         this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
 
-        this.stormConfig.topologyName = config.getString("storm.name");
-        this.stormConfig.workerNo = config.getInt("storm.worker.num");
         this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
         this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
         this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval");
@@ -117,9 +115,7 @@ public class SparkHistoryJobAppConfig implements Serializable {
     }
 
     public static class StormConfig implements Serializable {
-        public int workerNo;
         public int timeoutSec;
-        public String topologyName;
         public int spoutPending;
         public int spoutCrawlInterval;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
new file mode 100644
index 0000000..b73b52e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import java.io.InputStream;
+
+public interface JHFInputStreamReader {
+    void read(InputStream is) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
new file mode 100644
index 0000000..047e2d5
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import java.io.InputStream;
+
+public interface JHFParserBase {
+    /**
+     * this method will ensure to close the inputStream.
+     * @param is
+     * @throws Exception
+     */
+    void parse(InputStream is) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
new file mode 100644
index 0000000..571620a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.eagle.jpm.spark.entity.*;
+import org.apache.eagle.jpm.util.*;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class JHFSparkEventReader {
+    private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class);
+
+    private static final int FLUSH_LIMIT = 500;
+    private long firstTaskLaunchTime;
+    private long lastEventTime;
+
+    private Map<String, SparkExecutor> executors;
+    private SparkApp app;
+    private Map<Integer, SparkJob> jobs;
+    private Map<String, SparkStage> stages;
+    private Map<Integer, Set<String>> jobStageMap;
+    private Map<Long, SparkTask> tasks;
+    private EagleServiceClientImpl client;
+    private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
+
+    private List<TaggedLogAPIEntity> createEntities;
+
+    private Config conf;
+
+    public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) {
+        app = new SparkApp();
+        app.setTags(new HashMap<String, String>(baseTags));
+        app.setYarnState(info.getState());
+        app.setYarnStatus(info.getFinalStatus());
+        createEntities = new ArrayList<>();
+        jobs = new HashMap<Integer, SparkJob>();
+        stages = new HashMap<String, SparkStage>();
+        jobStageMap = new HashMap<Integer, Set<String>>();
+        tasks = new HashMap<Long, SparkTask>();
+        executors = new HashMap<String, SparkExecutor>();
+        stageTaskStatusMap = new HashMap<>();
+        conf = ConfigFactory.load();
+        this.initiateClient();
+    }
+
+    public SparkApp getApp() {
+        return this.app;
+    }
+
+    public void read(JSONObject eventObj) {
+        String eventType = (String) eventObj.get("Event");
+        if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationStart.toString())) {
+            handleAppStarted(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerEnvironmentUpdate.toString())) {
+            handleEnvironmentSet(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorAdded.toString())) {
+            handleExecutorAdd(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerAdded.toString())) {
+            handleBlockManagerAdd(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobStart.toString())) {
+            handleJobStart(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageSubmitted.toString())) {
+            handleStageSubmit(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskStart.toString())) {
+            handleTaskStart(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskEnd.toString())) {
+            handleTaskEnd(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageCompleted.toString())) {
+            handleStageComplete(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobEnd.toString())) {
+            handleJobEnd(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorRemoved.toString())) {
+            handleExecutorRemoved(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationEnd.toString())) {
+            handleAppEnd(eventObj);
+        } else if (eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerRemoved.toString())) {
+            //nothing to do now
+        } else {
+            LOG.info("Not registered event type:" + eventType);
+        }
+
+    }
+
+    private void handleEnvironmentSet(JSONObject event) {
+        app.setConfig(new JobConfig());
+        JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
+
+        String[] additionalJobConf = conf.getString("basic.jobConf.additional.info").split(",\\s*");
+        String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
+            "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
+            "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
+        String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, props);
+        for (String prop : jobConf) {
+            if (sparkProps.containsKey(prop)) {
+                app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop));
+            }
+        }
+    }
+
+    private Object getConfigVal(JobConfig config, String configName, String type) {
+        if (config.getConfig().containsKey(configName)) {
+            Object val = config.getConfig().get(configName);
+            if (type.equalsIgnoreCase(Integer.class.getName())) {
+                return Integer.parseInt((String) val);
+            } else {
+                return val;
+            }
+        } else {
+            if (type.equalsIgnoreCase(Integer.class.getName())) {
+                return conf.getInt("spark.defaultVal." + configName);
+            } else {
+                return conf.getString("spark.defaultVal." + configName);
+            }
+        }
+    }
+
+    private boolean isClientMode(JobConfig config) {
+        return config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client");
+    }
+
+    private void handleAppStarted(JSONObject event) {
+        //need update all entities tag before app start
+        List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
+        entities.addAll(this.executors.values());
+        entities.add(this.app);
+
+        long appStartTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+        for (TaggedLogAPIEntity entity : entities) {
+            entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtils.getString(event, "App ID"));
+            entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtils.getString(event, "App Name"));
+            // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1.
+            String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtils.getString(event, "App Attempt ID");
+            entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId);
+            // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text
+            // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
+            entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtils.getString(event, "App Name"), null));
+            entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtils.getString(event, "User"));
+
+            entity.setTimestamp(appStartTime);
+        }
+
+        this.app.setStartTime(appStartTime);
+        this.lastEventTime = appStartTime;
+    }
+
+    private void handleExecutorAdd(JSONObject event) {
+        String executorID = (String) event.get("Executor ID");
+        long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+        this.lastEventTime = executorAddTime;
+        SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime);
+
+        JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor Info");
+
+    }
+
+    private void handleBlockManagerAdd(JSONObject event) {
+        long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
+        long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+        this.lastEventTime = timestamp;
+        JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager ID");
+        String executorID = JSONUtils.getString(blockInfo, "Executor ID");
+        String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + JSONUtils.getLong(blockInfo, "Port");
+
+        SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
+        executor.setMaxMemory(maxMemory);
+        executor.setHostPort(hostAndPort);
+    }
+
+    private void handleTaskStart(JSONObject event) {
+        this.initializeTask(event);
+    }
+
+    private void handleTaskEnd(JSONObject event) {
+        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
+        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
+        SparkTask task = tasks.get(taskId);
+        if (task == null) {
+            return;
+        }
+
+        task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed"));
+        JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task Metrics");
+        if (null != taskMetrics) {
+            task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, "Executor Deserialize Time", lastEventTime));
+            task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor Run Time", lastEventTime));
+            task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", lastEventTime));
+            task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size"));
+            task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, "Result Serialization Time", lastEventTime));
+            task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory Bytes Spilled"));
+            task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk Bytes Spilled"));
+
+            JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, "Input Metrics");
+            if (null != inputMetrics) {
+                task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes Read"));
+                task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records Read"));
+            }
+
+            JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, "Output Metrics");
+            if (null != outputMetrics) {
+                task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes Written"));
+                task.setOutputRecords(JSONUtils.getLong(outputMetrics, "Records Written"));
+            }
+
+            JSONObject shuffleWriteMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics");
+            if (null != shuffleWriteMetrics) {
+                task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes Written"));
+                task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Records Written"));
+            }
+
+            JSONObject shuffleReadMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics");
+            if (null != shuffleReadMetrics) {
+                task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local Bytes Read"));
+                task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote Bytes Read"));
+                task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records Read"));
+            }
+        } else {
+            //for tasks success without task metrics, save in the end if no other information
+            if (!task.isFailed()) {
+                return;
+            }
+        }
+
+        aggregateToStage(task);
+        aggregateToExecutor(task);
+        tasks.remove(taskId);
+        this.flushEntities(task, false);
+    }
+
+
+    private SparkTask initializeTask(JSONObject event) {
+        SparkTask task = new SparkTask();
+        task.setTags(new HashMap<>(this.app.getTags()));
+        task.setTimestamp(app.getTimestamp());
+
+        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage ID")));
+        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
+
+        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
+        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
+        task.setTaskId(taskId);
+
+        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index")));
+        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
+        long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime);
+        this.lastEventTime = launchTime;
+        if (taskId == 0) {
+            this.setFirstTaskLaunchTime(launchTime);
+        }
+        task.setLaunchTime(launchTime);
+        task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID"));
+        task.setHost(JSONUtils.getString(taskInfo, "Host"));
+        task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality"));
+        task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative"));
+
+        tasks.put(task.getTaskId(), task);
+        return task;
+    }
+
+    private void setFirstTaskLaunchTime(long launchTime) {
+        this.firstTaskLaunchTime = launchTime;
+    }
+
+    private void handleJobStart(JSONObject event) {
+        SparkJob job = new SparkJob();
+        job.setTags(new HashMap<>(this.app.getTags()));
+        job.setTimestamp(app.getTimestamp());
+
+        int jobId = JSONUtils.getInt(event, "Job ID");
+        job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
+        long submissionTime = JSONUtils.getLong(event, "Submission Time", lastEventTime);
+        job.setSubmissionTime(submissionTime);
+        this.lastEventTime = submissionTime;
+
+        //for complete application, no active stages/tasks
+        job.setNumActiveStages(0);
+        job.setNumActiveTasks(0);
+
+        this.jobs.put(jobId, job);
+        this.jobStageMap.put(jobId, new HashSet<String>());
+
+        JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos");
+        int stagesSize = (stages == null ? 0 : stages.size());
+        job.setNumStages(stagesSize);
+        for (int i = 0; i < stagesSize; i++) {
+            JSONObject stageInfo = (JSONObject) stages.get(i);
+            int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+            int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
+            String stageName = JSONUtils.getString(stageInfo, "Stage Name");
+            int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
+            this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks);
+        }
+    }
+
+    private void handleStageSubmit(JSONObject event) {
+        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
+        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
+        String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
+        stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
+
+        if (!stages.containsKey(key)) {
+            //may be further attempt for one stage
+            String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0");
+            if (stages.containsKey(baseAttempt)) {
+                SparkStage stage = stages.get(baseAttempt);
+                String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
+
+                String stageName = JSONUtils.getString(event, "Stage Name");
+                int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
+                this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks);
+            }
+        }
+    }
+
+    private void handleStageComplete(JSONObject event) {
+        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
+        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
+        String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
+        SparkStage stage = stages.get(key);
+
+        // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0.
+        Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", firstTaskLaunchTime);
+
+        stage.setSubmitTime(submissionTime);
+
+        long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", lastEventTime);
+        stage.setCompleteTime(completeTime);
+        this.lastEventTime = completeTime;
+
+        if (stageInfo != null && stageInfo.containsKey("Failure Reason")) {
+            stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
+        } else {
+            stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());
+        }
+    }
+
+    private void handleExecutorRemoved(JSONObject event) {
+        String executorID = JSONUtils.getString(event, "Executor ID");
+        SparkExecutor executor = executors.get(executorID);
+        long removedTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+        executor.setEndTime(removedTime);
+        this.lastEventTime = removedTime;
+    }
+
+    private void handleJobEnd(JSONObject event) {
+        int jobId = JSONUtils.getInt(event, "Job ID");
+        SparkJob job = jobs.get(jobId);
+
+        long completionTime = JSONUtils.getLong(event, "Completion Time", lastEventTime);
+        job.setCompletionTime(completionTime);
+        this.lastEventTime = completionTime;
+
+        JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result");
+        String result = JSONUtils.getString(jobResult, "Result");
+        if (result.equalsIgnoreCase("JobSucceeded")) {
+            job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString());
+        } else {
+            job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString());
+        }
+    }
+
+    private void handleAppEnd(JSONObject event) {
+        long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+        app.setEndTime(endTime);
+        this.lastEventTime = endTime;
+    }
+
+    public void clearReader() throws Exception {
+        //clear tasks
+        for (SparkTask task : tasks.values()) {
+            LOG.info("Task {} does not have result or no task metrics.", task.getTaskId());
+            task.setFailed(true);
+            aggregateToStage(task);
+            aggregateToExecutor(task);
+            this.flushEntities(task, false);
+        }
+
+        List<SparkStage> needStoreStages = new ArrayList<>();
+        for (SparkStage stage : this.stages.values()) {
+            int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+            if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
+                SparkJob job = this.jobs.get(jobId);
+                job.setNumSkippedStages(job.getNumSkippedStages() + 1);
+                job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks());
+            } else {
+                this.aggregateToJob(stage);
+                this.aggregateStageToApp(stage);
+                needStoreStages.add(stage);
+            }
+            String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+            String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+            this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId));
+        }
+
+        this.flushEntities(needStoreStages, false);
+        for (SparkJob job : jobs.values()) {
+            this.aggregateJobToApp(job);
+        }
+        this.flushEntities(jobs.values(), false);
+
+        app.setExecutors(executors.values().size());
+
+        long executorMemory = Utils.parseMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
+        long driverMemory = Utils.parseMemory(this.isClientMode(app.getConfig())
+            ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName())
+            : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
+
+        int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName());
+        int driverCore = this.isClientMode(app.getConfig())
+            ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName())
+            : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName());
+
+        long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead");
+        long driverMemoryOverhead = this.isClientMode(app.getConfig())
+            ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead")
+            : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead");
+
+        app.setExecMemoryBytes(executorMemory);
+        app.setDriveMemoryBytes(driverMemory);
+        app.setExecutorCores(executorCore);
+        app.setDriverCores(driverCore);
+        app.setExecutorMemoryOverhead(executorMemoryOverhead);
+        app.setDriverMemoryOverhead(driverMemoryOverhead);
+
+        for (SparkExecutor executor : executors.values()) {
+            String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
+            if (executorID.equalsIgnoreCase("driver")) {
+                executor.setExecMemoryBytes(driverMemory);
+                executor.setCores(driverCore);
+                executor.setMemoryOverhead(driverMemoryOverhead);
+            } else {
+                executor.setExecMemoryBytes(executorMemory);
+                executor.setCores(executorCore);
+                executor.setMemoryOverhead(executorMemoryOverhead);
+            }
+            if (app.getEndTime() <= 0L) {
+                app.setEndTime(this.lastEventTime);
+            }
+            if (executor.getEndTime() <= 0L) {
+                executor.setEndTime(app.getEndTime());
+            }
+            this.aggregateExecutorToApp(executor);
+        }
+        this.flushEntities(executors.values(), false);
+        //spark code...tricky
+        app.setSkippedTasks(app.getCompleteTasks());
+        this.flushEntities(app, true);
+    }
+
+    private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) {
+        long result = 0L;
+        String fieldValue = config.getConfig().get(fieldName);
+        if (fieldValue != null) {
+            result = Utils.parseMemory(fieldValue + "m");
+            if (result == 0L) {
+               result = Utils.parseMemory(fieldValue);
+            }
+        }
+
+        if (result == 0L) {
+            result = Math.max(
+                    Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
+                    executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
+        }
+        return result;
+    }
+
+    private void aggregateExecutorToApp(SparkExecutor executor) {
+        long totalExecutorTime = app.getTotalExecutorTime() + executor.getEndTime() - executor.getStartTime();
+        if (totalExecutorTime < 0L) {
+            totalExecutorTime = 0L;
+        }
+        app.setTotalExecutorTime(totalExecutorTime);
+    }
+
+    private void aggregateJobToApp(SparkJob job) {
+        //aggregate job level metrics
+        app.setNumJobs(app.getNumJobs() + 1);
+        app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
+        app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks());
+        app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
+        app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
+        app.setTotalStages(app.getTotalStages() + job.getNumStages());
+        app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
+        app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages());
+    }
+
+    private void aggregateStageToApp(SparkStage stage) {
+        //aggregate task level metrics
+        app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled());
+        app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled());
+        app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime());
+        app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
+        app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime());
+        app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime());
+        app.setResultSize(app.getResultSize() + stage.getResultSize());
+        app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
+        app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
+        app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords());
+        app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
+        app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords());
+        app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes());
+        app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords());
+        app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes());
+    }
+
+    private void aggregateToStage(SparkTask task) {
+        String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+        String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+        String key = this.generateStageKey(stageId, stageAttemptId);
+        SparkStage stage = stages.get(key);
+
+        stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled());
+        stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled());
+        stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime());
+        stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
+        stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime());
+        stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime());
+        stage.setResultSize(stage.getResultSize() + task.getResultSize());
+        stage.setInputRecords(stage.getInputRecords() + task.getInputRecords());
+        stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
+        stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords());
+        stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
+        stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords());
+        stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes());
+        stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords());
+        long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
+        stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes);
+
+        boolean success = !task.isFailed();
+
+        Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
+        if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
+            //has previous task attempt, retrieved from task index in one stage
+            boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex);
+            success = previousResult || success;
+            if (previousResult != success) {
+                stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
+                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+                stageTaskStatusMap.get(key).put(taskIndex, success);
+            }
+        } else {
+            if (success) {
+                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+            } else {
+                stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
+            }
+            stageTaskStatusMap.get(key).put(taskIndex, success);
+        }
+
+    }
+
+    private void aggregateToExecutor(SparkTask task) {
+        String executorId = task.getExecutorId();
+        SparkExecutor executor = executors.get(executorId);
+
+        if (null != executor) {
+            executor.setTotalTasks(executor.getTotalTasks() + 1);
+            if (task.isFailed()) {
+                executor.setFailedTasks(executor.getFailedTasks() + 1);
+            } else {
+                executor.setCompletedTasks(executor.getCompletedTasks() + 1);
+            }
+            long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
+            executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes);
+            executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
+            executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes());
+            executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes());
+            executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
+        }
+
+    }
+
+    private void aggregateToJob(SparkStage stage) {
+        int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+        SparkJob job = jobs.get(jobId);
+        job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks());
+        job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks());
+        job.setNumTask(job.getNumTask() + stage.getNumTasks());
+
+
+        if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
+            //if multiple attempts succeed, just count one
+            if (!hasStagePriorAttemptSuccess(stage)) {
+                job.setNumCompletedStages(job.getNumCompletedStages() + 1);
+            }
+        } else {
+            job.setNumFailedStages(job.getNumFailedStages() + 1);
+        }
+    }
+
+    private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
+        int stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
+        for (int i = 0; i < stageAttemptId; i++) {
+            SparkStage previousStage = stages.get(this.generateStageKey(
+                    stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), Integer.toString(i)));
+            if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+
+    private String generateStageKey(String stageId, String stageAttemptId) {
+        return stageId + "-" + stageAttemptId;
+    }
+
+    private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) {
+        SparkStage stage = new SparkStage();
+        stage.setTags(new HashMap<>(this.app.getTags()));
+        stage.setTimestamp(app.getTimestamp());
+        stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
+        stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Integer.toString(stageId));
+        stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Integer.toString(stageAttemptId));
+        stage.setName(name);
+        stage.setNumActiveTasks(0);
+        stage.setNumTasks(numTasks);
+        stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ?
+                "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool"));
+
+        String stageKey = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
+        stages.put(stageKey, stage);
+        this.jobStageMap.get(jobId).add(stageKey);
+    }
+
+
+    private SparkExecutor initiateExecutor(String executorID, long startTime) {
+        if (!executors.containsKey(executorID)) {
+            SparkExecutor executor = new SparkExecutor();
+            executor.setTags(new HashMap<>(this.app.getTags()));
+            executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID);
+            executor.setStartTime(startTime);
+            executor.setTimestamp(app.getTimestamp());
+
+            this.executors.put(executorID, executor);
+        }
+
+        return this.executors.get(executorID);
+    }
+
+    private String getNormalizedName(String jobName, String assignedName) {
+        if (null != assignedName) {
+            return assignedName;
+        } else {
+            return JobNameNormalization.getInstance().normalize(jobName);
+        }
+    }
+
+    private void flushEntities(Object entity, boolean forceFlush) {
+        this.flushEntities(Collections.singletonList(entity), forceFlush);
+    }
+
+    private void flushEntities(Collection entities, boolean forceFlush) {
+        this.createEntities.addAll(entities);
+
+        if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
+            try {
+                this.doFlush(this.createEntities);
+                this.createEntities.clear();
+            } catch (Exception e) {
+                LOG.error("Fail to flush entities", e);
+            }
+
+        }
+    }
+
+    private EagleServiceBaseClient initiateClient() {
+        String host = conf.getString("eagleProps.eagle.service.host");
+        int port = conf.getInt("eagleProps.eagle.service.port");
+        String userName = conf.getString("eagleProps.eagle.service.username");
+        String pwd = conf.getString("eagleProps.eagle.service.password");
+        client = new EagleServiceClientImpl(host, port, userName, pwd);
+        int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
+        client.getJerseyClient().setReadTimeout(timeout * 1000);
+
+        return client;
+    }
+
+    private void doFlush(List entities) throws IOException, EagleServiceClientException {
+        client.create(entities);
+        int size = (entities == null ? 0 : entities.size());
+        LOG.info("finish flushing entities of total number " + size);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
new file mode 100644
index 0000000..b1dd09c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class JHFSparkParser implements JHFParserBase {
+
+    private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class);
+
+    private boolean isValidJson;
+
+    private JHFSparkEventReader eventReader;
+
+    public JHFSparkParser(JHFSparkEventReader reader) {
+        this.eventReader = reader;
+    }
+
+    @Override
+    public void parse(InputStream is) throws Exception {
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
+            for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+                isValidJson = true;
+                JSONObject eventObj = parseAndValidateJSON(line);
+                if (isValidJson) {
+                    try {
+                        this.eventReader.read(eventObj);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+            this.eventReader.clearReader();
+        }
+    }
+
+    private JSONObject parseAndValidateJSON(String line) {
+        JSONObject eventObj = null;
+        JSONParser parser = new JSONParser();
+        try {
+            eventObj = (JSONObject) parser.parse(line);
+        } catch (ParseException ex) {
+            isValidJson = false;
+            logger.error(String.format("Invalid json string. Fail to parse %s.", line), ex);
+        }
+        return eventObj;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
new file mode 100644
index 0000000..c206b71
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
@@ -0,0 +1,69 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+public class SparkApplicationInfo {
+
+    private String state;
+    private String finalStatus;
+    private String queue;
+    private String name;
+    private String user;
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public String getFinalStatus() {
+        return finalStatus;
+    }
+
+    public void setFinalStatus(String finalStatus) {
+        this.finalStatus = finalStatus;
+    }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
new file mode 100644
index 0000000..0144410
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import org.apache.eagle.jpm.util.SparkJobTagName;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReader {
+
+    private String site;
+    private SparkApplicationInfo app;
+
+
+    public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app) {
+        this.site = site;
+        this.app = app;
+    }
+
+    @Override
+    public void read(InputStream is) throws Exception {
+        Map<String, String> baseTags = new HashMap<>();
+        baseTags.put(SparkJobTagName.SITE.toString(), site);
+        baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
+        JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app));
+        parser.parse(is);
+    }
+
+    public static void main(String[] args) throws Exception {
+        SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
+        impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1")));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 9fafc1f..0bb65df 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -19,7 +19,7 @@
 
 package org.apache.eagle.jpm.spark.history.status;
 
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
 import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -233,10 +233,11 @@ public class JobHistoryZKStateManager {
                     curator.setData().forPath(path, status.toString().getBytes("UTF-8"));
                 }
             } else {
-                LOG.error("Failed to update for application with path: " + path);
+                LOG.warn("failed to update with status {} due to path {} not existing ", status, path);
+                //throw new RuntimeException("Failed to update for application with path: " + path);
             }
         } catch (Exception e) {
-            LOG.error("fail to update application status", e);
+            LOG.error("fail to update application status as {}", status, e);
             throw new RuntimeException(e);
         } finally {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
index e88c62f..0351de3 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -19,9 +19,9 @@
 
 package org.apache.eagle.jpm.spark.history.storm;
 
-import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
-import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
+import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.history.crawl.SparkFilesystemInputStreamReaderImpl;
 import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
 import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
@@ -106,9 +106,12 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
             zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
             LOG.info("Successfully parse application {}", appId);
             collector.ack(tuple);
+        } catch (RuntimeException e) {
+            LOG.warn("fail to process application {} due to RuntimeException, ignore it", appId, e);
+            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
+            collector.ack(tuple);
         } catch (Exception e) {
-            LOG.error("Fail to process application {}", appId, e);
-            zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED);
+            LOG.error("Fail to process application {}, and retry", appId, e);
             collector.fail(tuple);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
index 5602b4c..4c50607 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
@@ -91,10 +91,11 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
                 LOG.info("emit " + appId);
                 zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
             }
-            LOG.info("{} apps sent.", appIds.size());
 
             if (appIds.isEmpty()) {
-                this.takeRest(60);
+                this.takeRest(10);
+            } else {
+                LOG.info("{} apps sent.", appIds.size());
             }
         } catch (Exception e) {
             LOG.error("Fail to run next tuple", e);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index 26842b8..b94c603 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -153,24 +153,6 @@
             <value>http://sandbox.hortonworks.com:8088</value>
         </property>
         <property>
-            <name>storm.mode</name>
-            <displayName>mode</displayName>
-            <description>Storm Mode: local or cluster</description>
-            <value>local</value>
-        </property>
-        <property>
-            <name>storm.worker.num</name>
-            <displayName>worker.num</displayName>
-            <description>The number of workers</description>
-            <value>2</value>
-        </property>
-        <property>
-            <name>name</name>
-            <displayName>name</displayName>
-            <description>Name of the topology</description>
-            <value>sparkHistoryJob</value>
-        </property>
-        <property>
             <name>storm.messageTimeoutSec</name>
             <displayName>messageTimeoutSec</displayName>
             <description>Message timeout (in seconds)</description>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 58dd552..4c22b15 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -15,6 +15,9 @@
 
 
 {
+  "appId": "sparkHistoryJob",
+  "mode": "CLUSTER",
+  "workers" : 3,
   "basic":{
     "cluster":"sandbox",
     "dataCenter":"sandbox",
@@ -45,8 +48,6 @@
     }
   },
   "storm":{
-    worker.num: 2,
-    "name":"sparkHistoryJob",
     "messageTimeoutSec": 3000,
     "pendingSpout": 1000,
     "spoutCrawlInterval": 10000,#in ms
@@ -72,7 +73,5 @@
       spark.yarn.am.memoryOverhead.factor: 10,
       spark.yarn.overhead.min: "384m"
     }
-  },
-  "appId": "sparkHistoryJob",
-  "mode": "CLUSTER"
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
deleted file mode 100644
index 5d1cfaa..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class JPMEntityRepository extends EntityRepository {
-    public JPMEntityRepository() {
-        entitySet.add(SparkAppEntity.class);
-        entitySet.add(SparkJobEntity.class);
-        entitySet.add(SparkStageEntity.class);
-        entitySet.add(SparkTaskEntity.class);
-        entitySet.add(SparkExecutorEntity.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
deleted file mode 100644
index e18f1e7..0000000
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
-public class JobConfig extends HashMap<String, String> implements Serializable {
-}
-


[3/3] incubator-eagle git commit: Update spark history job feeder config & refactor the code

Posted by qi...@apache.org.
Update spark history job feeder config & refactor the code

Author: Qingwen Zhao <qi...@gmail.com>

Closes #416 from qingwen220/sparkHist.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3110c72e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3110c72e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3110c72e

Branch: refs/heads/develop
Commit: 3110c72e47f697f5c59b5f7d6559d527cf25db3a
Parents: 8774b85
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Wed Sep 7 10:36:24 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Wed Sep 7 10:36:24 2016 +0800

----------------------------------------------------------------------
 .../environment/impl/StormExecutionRuntime.java |  32 +-
 .../apache/eagle/jpm/spark/crawl/EventType.java |  24 -
 .../jpm/spark/crawl/JHFInputStreamReader.java   |  24 -
 .../eagle/jpm/spark/crawl/JHFParserBase.java    |  29 -
 .../jpm/spark/crawl/JHFSparkEventReader.java    | 713 -------------------
 .../eagle/jpm/spark/crawl/JHFSparkParser.java   |  73 --
 .../jpm/spark/crawl/SparkApplicationInfo.java   |  69 --
 .../SparkFilesystemInputStreamReaderImpl.java   |  53 --
 .../running/entities/JPMEntityRepository.java   |  33 +
 .../jpm/spark/running/entities/JobConfig.java   |  26 +
 .../spark/running/entities/SparkAppEntity.java  | 476 +++++++++++++
 .../running/entities/SparkExecutorEntity.java   | 233 ++++++
 .../spark/running/entities/SparkJobEntity.java  | 191 +++++
 .../running/entities/SparkStageEntity.java      | 299 ++++++++
 .../spark/running/entities/SparkTaskEntity.java | 290 ++++++++
 .../spark/history/SparkHistoryJobAppConfig.java |   4 -
 .../history/crawl/JHFInputStreamReader.java     |  24 +
 .../jpm/spark/history/crawl/JHFParserBase.java  |  29 +
 .../history/crawl/JHFSparkEventReader.java      | 713 +++++++++++++++++++
 .../jpm/spark/history/crawl/JHFSparkParser.java |  73 ++
 .../history/crawl/SparkApplicationInfo.java     |  69 ++
 .../SparkFilesystemInputStreamReaderImpl.java   |  53 ++
 .../status/JobHistoryZKStateManager.java        |   7 +-
 .../history/storm/SparkHistoryJobParseBolt.java |  13 +-
 .../history/storm/SparkHistoryJobSpout.java     |   5 +-
 ...spark.history.SparkHistoryJobAppProvider.xml |  18 -
 .../src/main/resources/application.conf         |   9 +-
 .../running/entities/JPMEntityRepository.java   |  30 -
 .../jpm/spark/running/entities/JobConfig.java   |  25 -
 .../spark/running/entities/SparkAppEntity.java  | 475 ------------
 .../running/entities/SparkExecutorEntity.java   | 232 ------
 .../spark/running/entities/SparkJobEntity.java  | 190 -----
 .../running/entities/SparkStageEntity.java      | 298 --------
 .../spark/running/entities/SparkTaskEntity.java | 289 --------
 .../running/parser/SparkApplicationParser.java  |   8 +-
 .../src/main/resources/application.conf         |   6 +-
 .../apache/eagle/jpm/util/SparkEventType.java   |  25 +
 .../util/resourcefetch/RMResourceFetcher.java   |   2 +-
 38 files changed, 2575 insertions(+), 2587 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 04cc19b..e37e8f2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -34,13 +34,13 @@ import scala.Int;
 import storm.trident.spout.RichSpoutBatchExecutor;
 
 public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
-    private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
     private static LocalCluster _localCluster;
 
     private StormEnvironment environment;
 
-    private static LocalCluster getLocalCluster(){
-        if(_localCluster == null){
+    private static LocalCluster getLocalCluster() {
+        if (_localCluster == null) {
             _localCluster = new LocalCluster();
         }
         return _localCluster;
@@ -56,13 +56,13 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         return this.environment;
     }
 
-    private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
-    private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
-    private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
-    private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
+    private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
+    private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+    private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
+    private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
     private static final String WORKERS = "workers";
 
-    public backtype.storm.Config getStormConfig(){
+    public backtype.storm.Config getStormConfig() {
         backtype.storm.Config conf = new backtype.storm.Config();
         conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
         conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
@@ -71,14 +71,14 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
         conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
         String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
-        if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
+        if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
             nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
             LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
         } else {
             LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
         }
         Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
-        if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
+        if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
             nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
             LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
         } else {
@@ -94,15 +94,15 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
     }
 
     @Override
-    public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config){
+    public void start(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
         String topologyName = config.getString("appId");
-        Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
+        Preconditions.checkNotNull(topologyName,"[appId] is required by null for " + executor.getClass().getCanonicalName());
         StormTopology topology = executor.execute(config, environment);
-        LOG.info("Starting {} ({}), mode: {}",topologyName,executor.getClass().getCanonicalName(), config.getString("mode"));
+        LOG.info("Starting {} ({}), mode: {}",topologyName, executor.getClass().getCanonicalName(), config.getString("mode"));
         Config conf = getStormConfig();
-        if(ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))){
+        if (ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))) {
             String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
-            if(jarFile == null){
+            if (jarFile == null) {
                 jarFile = DynamicJarPathFinder.findPath(executor.getClass());
             }
             synchronized (StormExecutionRuntime.class) {
@@ -129,7 +129,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
     public void stop(Application<StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
         String appId = config.getString("appId");
         LOG.info("Stopping topology {} ..." + appId);
-        if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+        if (config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()) {
             Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
             try {
                 stormClient.killTopology(appId);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
deleted file mode 100644
index 1ba15b7..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-public enum EventType {
-    SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, SparkListenerApplicationStart,
-    SparkListenerExecutorAdded, SparkListenerJobStart,SparkListenerStageSubmitted, SparkListenerTaskStart,SparkListenerBlockManagerRemoved,
-    SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, SparkListenerApplicationEnd,SparkListenerExecutorRemoved
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
deleted file mode 100644
index 8a8d0db..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import java.io.InputStream;
-
-public interface JHFInputStreamReader {
-    void read(InputStream is) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
deleted file mode 100644
index 62ba7d9..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import java.io.InputStream;
-
-public interface JHFParserBase {
-    /**
-     * this method will ensure to close the inputStream.
-     * @param is
-     * @throws Exception
-     */
-    void parse(InputStream is) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
deleted file mode 100644
index 22b715a..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.eagle.jpm.spark.entity.*;
-import org.apache.eagle.jpm.util.*;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.service.client.EagleServiceClientException;
-import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class JHFSparkEventReader {
-    private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class);
-
-    private static final int FLUSH_LIMIT = 500;
-    private long firstTaskLaunchTime;
-    private long lastEventTime;
-
-    private Map<String, SparkExecutor> executors;
-    private SparkApp app;
-    private Map<Integer, SparkJob> jobs;
-    private Map<String, SparkStage> stages;
-    private Map<Integer, Set<String>> jobStageMap;
-    private Map<Long, SparkTask> tasks;
-    private EagleServiceClientImpl client;
-    private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
-
-    private List<TaggedLogAPIEntity> createEntities;
-
-    private Config conf;
-
-    public JHFSparkEventReader(Map<String, String> baseTags, SparkApplicationInfo info) {
-        app = new SparkApp();
-        app.setTags(new HashMap<String, String>(baseTags));
-        app.setYarnState(info.getState());
-        app.setYarnStatus(info.getFinalStatus());
-        createEntities = new ArrayList<>();
-        jobs = new HashMap<Integer, SparkJob>();
-        stages = new HashMap<String, SparkStage>();
-        jobStageMap = new HashMap<Integer, Set<String>>();
-        tasks = new HashMap<Long, SparkTask>();
-        executors = new HashMap<String, SparkExecutor>();
-        stageTaskStatusMap = new HashMap<>();
-        conf = ConfigFactory.load();
-        this.initiateClient();
-    }
-
-    public SparkApp getApp() {
-        return this.app;
-    }
-
-    public void read(JSONObject eventObj) {
-        String eventType = (String) eventObj.get("Event");
-        if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) {
-            handleAppStarted(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
-            handleEnvironmentSet(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) {
-            handleExecutorAdd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString())) {
-            handleBlockManagerAdd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) {
-            handleJobStart(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) {
-            handleStageSubmit(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) {
-            handleTaskStart(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) {
-            handleTaskEnd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) {
-            handleStageComplete(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) {
-            handleJobEnd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) {
-            handleExecutorRemoved(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) {
-            handleAppEnd(eventObj);
-        } else if (eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString())) {
-            //nothing to do now
-        } else {
-            LOG.info("Not registered event type:" + eventType);
-        }
-
-    }
-
-    private void handleEnvironmentSet(JSONObject event) {
-        app.setConfig(new JobConfig());
-        JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
-
-        String[] additionalJobConf = conf.getString("basic.jobConf.additional.info").split(",\\s*");
-        String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
-            "spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
-            "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
-        String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, props);
-        for (String prop : jobConf) {
-            if (sparkProps.containsKey(prop)) {
-                app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop));
-            }
-        }
-    }
-
-    private Object getConfigVal(JobConfig config, String configName, String type) {
-        if (config.getConfig().containsKey(configName)) {
-            Object val = config.getConfig().get(configName);
-            if (type.equalsIgnoreCase(Integer.class.getName())) {
-                return Integer.parseInt((String) val);
-            } else {
-                return val;
-            }
-        } else {
-            if (type.equalsIgnoreCase(Integer.class.getName())) {
-                return conf.getInt("spark.defaultVal." + configName);
-            } else {
-                return conf.getString("spark.defaultVal." + configName);
-            }
-        }
-    }
-
-    private boolean isClientMode(JobConfig config) {
-        return config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client");
-    }
-
-    private void handleAppStarted(JSONObject event) {
-        //need update all entities tag before app start
-        List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
-        entities.addAll(this.executors.values());
-        entities.add(this.app);
-
-        long appStartTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        for (TaggedLogAPIEntity entity : entities) {
-            entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtils.getString(event, "App ID"));
-            entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtils.getString(event, "App Name"));
-            // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1.
-            String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtils.getString(event, "App Attempt ID");
-            entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId);
-            // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text
-            // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
-            entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtils.getString(event, "App Name"), null));
-            entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtils.getString(event, "User"));
-
-            entity.setTimestamp(appStartTime);
-        }
-
-        this.app.setStartTime(appStartTime);
-        this.lastEventTime = appStartTime;
-    }
-
-    private void handleExecutorAdd(JSONObject event) {
-        String executorID = (String) event.get("Executor ID");
-        long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        this.lastEventTime = executorAddTime;
-        SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime);
-
-        JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor Info");
-
-    }
-
-    private void handleBlockManagerAdd(JSONObject event) {
-        long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
-        long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        this.lastEventTime = timestamp;
-        JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager ID");
-        String executorID = JSONUtils.getString(blockInfo, "Executor ID");
-        String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + JSONUtils.getLong(blockInfo, "Port");
-
-        SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
-        executor.setMaxMemory(maxMemory);
-        executor.setHostPort(hostAndPort);
-    }
-
-    private void handleTaskStart(JSONObject event) {
-        this.initializeTask(event);
-    }
-
-    private void handleTaskEnd(JSONObject event) {
-        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
-        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
-        SparkTask task = tasks.get(taskId);
-        if (task == null) {
-            return;
-        }
-
-        task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed"));
-        JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task Metrics");
-        if (null != taskMetrics) {
-            task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, "Executor Deserialize Time", lastEventTime));
-            task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor Run Time", lastEventTime));
-            task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", lastEventTime));
-            task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size"));
-            task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, "Result Serialization Time", lastEventTime));
-            task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory Bytes Spilled"));
-            task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk Bytes Spilled"));
-
-            JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, "Input Metrics");
-            if (null != inputMetrics) {
-                task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes Read"));
-                task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records Read"));
-            }
-
-            JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, "Output Metrics");
-            if (null != outputMetrics) {
-                task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes Written"));
-                task.setOutputRecords(JSONUtils.getLong(outputMetrics, "Records Written"));
-            }
-
-            JSONObject shuffleWriteMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics");
-            if (null != shuffleWriteMetrics) {
-                task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes Written"));
-                task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Records Written"));
-            }
-
-            JSONObject shuffleReadMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics");
-            if (null != shuffleReadMetrics) {
-                task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local Bytes Read"));
-                task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote Bytes Read"));
-                task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records Read"));
-            }
-        } else {
-            //for tasks success without task metrics, save in the end if no other information
-            if (!task.isFailed()) {
-                return;
-            }
-        }
-
-        aggregateToStage(task);
-        aggregateToExecutor(task);
-        tasks.remove(taskId);
-        this.flushEntities(task, false);
-    }
-
-
-    private SparkTask initializeTask(JSONObject event) {
-        SparkTask task = new SparkTask();
-        task.setTags(new HashMap<>(this.app.getTags()));
-        task.setTimestamp(app.getTimestamp());
-
-        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage ID")));
-        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
-
-        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
-        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
-        task.setTaskId(taskId);
-
-        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index")));
-        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
-        long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime);
-        this.lastEventTime = launchTime;
-        if (taskId == 0) {
-            this.setFirstTaskLaunchTime(launchTime);
-        }
-        task.setLaunchTime(launchTime);
-        task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID"));
-        task.setHost(JSONUtils.getString(taskInfo, "Host"));
-        task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality"));
-        task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative"));
-
-        tasks.put(task.getTaskId(), task);
-        return task;
-    }
-
-    private void setFirstTaskLaunchTime(long launchTime) {
-        this.firstTaskLaunchTime = launchTime;
-    }
-
-    private void handleJobStart(JSONObject event) {
-        SparkJob job = new SparkJob();
-        job.setTags(new HashMap<>(this.app.getTags()));
-        job.setTimestamp(app.getTimestamp());
-
-        int jobId = JSONUtils.getInt(event, "Job ID");
-        job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
-        long submissionTime = JSONUtils.getLong(event, "Submission Time", lastEventTime);
-        job.setSubmissionTime(submissionTime);
-        this.lastEventTime = submissionTime;
-
-        //for complete application, no active stages/tasks
-        job.setNumActiveStages(0);
-        job.setNumActiveTasks(0);
-
-        this.jobs.put(jobId, job);
-        this.jobStageMap.put(jobId, new HashSet<String>());
-
-        JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos");
-        int stagesSize = (stages == null ? 0 : stages.size());
-        job.setNumStages(stagesSize);
-        for (int i = 0; i < stagesSize; i++) {
-            JSONObject stageInfo = (JSONObject) stages.get(i);
-            int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
-            int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
-            String stageName = JSONUtils.getString(stageInfo, "Stage Name");
-            int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
-            this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks);
-        }
-    }
-
-    private void handleStageSubmit(JSONObject event) {
-        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
-        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
-        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
-        String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
-        stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
-
-        if (!stages.containsKey(key)) {
-            //may be further attempt for one stage
-            String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0");
-            if (stages.containsKey(baseAttempt)) {
-                SparkStage stage = stages.get(baseAttempt);
-                String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
-
-                String stageName = JSONUtils.getString(event, "Stage Name");
-                int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
-                this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks);
-            }
-        }
-    }
-
-    private void handleStageComplete(JSONObject event) {
-        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
-        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
-        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
-        String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
-        SparkStage stage = stages.get(key);
-
-        // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0.
-        Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", firstTaskLaunchTime);
-
-        stage.setSubmitTime(submissionTime);
-
-        long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", lastEventTime);
-        stage.setCompleteTime(completeTime);
-        this.lastEventTime = completeTime;
-
-        if (stageInfo != null && stageInfo.containsKey("Failure Reason")) {
-            stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
-        } else {
-            stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());
-        }
-    }
-
-    private void handleExecutorRemoved(JSONObject event) {
-        String executorID = JSONUtils.getString(event, "Executor ID");
-        SparkExecutor executor = executors.get(executorID);
-        long removedTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        executor.setEndTime(removedTime);
-        this.lastEventTime = removedTime;
-    }
-
-    private void handleJobEnd(JSONObject event) {
-        int jobId = JSONUtils.getInt(event, "Job ID");
-        SparkJob job = jobs.get(jobId);
-
-        long completionTime = JSONUtils.getLong(event, "Completion Time", lastEventTime);
-        job.setCompletionTime(completionTime);
-        this.lastEventTime = completionTime;
-
-        JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result");
-        String result = JSONUtils.getString(jobResult, "Result");
-        if (result.equalsIgnoreCase("JobSucceeded")) {
-            job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString());
-        } else {
-            job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString());
-        }
-    }
-
-    private void handleAppEnd(JSONObject event) {
-        long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
-        app.setEndTime(endTime);
-        this.lastEventTime = endTime;
-    }
-
-    public void clearReader() throws Exception {
-        //clear tasks
-        for (SparkTask task : tasks.values()) {
-            LOG.info("Task {} does not have result or no task metrics.", task.getTaskId());
-            task.setFailed(true);
-            aggregateToStage(task);
-            aggregateToExecutor(task);
-            this.flushEntities(task, false);
-        }
-
-        List<SparkStage> needStoreStages = new ArrayList<>();
-        for (SparkStage stage : this.stages.values()) {
-            int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
-            if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
-                SparkJob job = this.jobs.get(jobId);
-                job.setNumSkippedStages(job.getNumSkippedStages() + 1);
-                job.setNumSkippedTasks(job.getNumSkippedTasks() + stage.getNumTasks());
-            } else {
-                this.aggregateToJob(stage);
-                this.aggregateStageToApp(stage);
-                needStoreStages.add(stage);
-            }
-            String stageId = stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
-            String stageAttemptId = stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
-            this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, stageAttemptId));
-        }
-
-        this.flushEntities(needStoreStages, false);
-        for (SparkJob job : jobs.values()) {
-            this.aggregateJobToApp(job);
-        }
-        this.flushEntities(jobs.values(), false);
-
-        app.setExecutors(executors.values().size());
-
-        long executorMemory = Utils.parseMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
-        long driverMemory = Utils.parseMemory(this.isClientMode(app.getConfig())
-            ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName())
-            : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
-
-        int executorCore = (Integer) this.getConfigVal(app.getConfig(), "spark.executor.cores", Integer.class.getName());
-        int driverCore = this.isClientMode(app.getConfig())
-            ? (Integer) this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", Integer.class.getName())
-            : (Integer) this.getConfigVal(app.getConfig(), "spark.driver.cores", Integer.class.getName());
-
-        long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), executorMemory, "spark.yarn.executor.memoryOverhead");
-        long driverMemoryOverhead = this.isClientMode(app.getConfig())
-            ? this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.am.memoryOverhead")
-            : this.getMemoryOverhead(app.getConfig(), driverMemory, "spark.yarn.driver.memoryOverhead");
-
-        app.setExecMemoryBytes(executorMemory);
-        app.setDriveMemoryBytes(driverMemory);
-        app.setExecutorCores(executorCore);
-        app.setDriverCores(driverCore);
-        app.setExecutorMemoryOverhead(executorMemoryOverhead);
-        app.setDriverMemoryOverhead(driverMemoryOverhead);
-
-        for (SparkExecutor executor : executors.values()) {
-            String executorID = executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
-            if (executorID.equalsIgnoreCase("driver")) {
-                executor.setExecMemoryBytes(driverMemory);
-                executor.setCores(driverCore);
-                executor.setMemoryOverhead(driverMemoryOverhead);
-            } else {
-                executor.setExecMemoryBytes(executorMemory);
-                executor.setCores(executorCore);
-                executor.setMemoryOverhead(executorMemoryOverhead);
-            }
-            if (app.getEndTime() <= 0L) {
-                app.setEndTime(this.lastEventTime);
-            }
-            if (executor.getEndTime() <= 0L) {
-                executor.setEndTime(app.getEndTime());
-            }
-            this.aggregateExecutorToApp(executor);
-        }
-        this.flushEntities(executors.values(), false);
-        //spark code...tricky
-        app.setSkippedTasks(app.getCompleteTasks());
-        this.flushEntities(app, true);
-    }
-
-    private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) {
-        long result = 0L;
-        String fieldValue = config.getConfig().get(fieldName);
-        if (fieldValue != null) {
-            result = Utils.parseMemory(fieldValue + "m");
-            if (result == 0L) {
-               result = Utils.parseMemory(fieldValue);
-            }
-        }
-
-        if (result == 0L) {
-            result = Math.max(
-                    Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
-                    executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
-        }
-        return result;
-    }
-
-    private void aggregateExecutorToApp(SparkExecutor executor) {
-        long totalExecutorTime = app.getTotalExecutorTime() + executor.getEndTime() - executor.getStartTime();
-        if (totalExecutorTime < 0L) {
-            totalExecutorTime = 0L;
-        }
-        app.setTotalExecutorTime(totalExecutorTime);
-    }
-
-    private void aggregateJobToApp(SparkJob job) {
-        //aggregate job level metrics
-        app.setNumJobs(app.getNumJobs() + 1);
-        app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
-        app.setCompleteTasks(app.getCompleteTasks() + job.getNumCompletedTasks());
-        app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
-        app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
-        app.setTotalStages(app.getTotalStages() + job.getNumStages());
-        app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
-        app.setSkippedStages(app.getSkippedStages() + job.getNumSkippedStages());
-    }
-
-    private void aggregateStageToApp(SparkStage stage) {
-        //aggregate task level metrics
-        app.setDiskBytesSpilled(app.getDiskBytesSpilled() + stage.getDiskBytesSpilled());
-        app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + stage.getMemoryBytesSpilled());
-        app.setExecutorRunTime(app.getExecutorRunTime() + stage.getExecutorRunTime());
-        app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
-        app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + stage.getExecutorDeserializeTime());
-        app.setResultSerializationTime(app.getResultSerializationTime() + stage.getResultSerializationTime());
-        app.setResultSize(app.getResultSize() + stage.getResultSize());
-        app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
-        app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
-        app.setOutputRecords(app.getOutputRecords() + stage.getOutputRecords());
-        app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
-        app.setShuffleWriteRecords(app.getShuffleWriteRecords() + stage.getShuffleWriteRecords());
-        app.setShuffleWriteBytes(app.getShuffleWriteBytes() + stage.getShuffleWriteBytes());
-        app.setShuffleReadRecords(app.getShuffleReadRecords() + stage.getShuffleReadRecords());
-        app.setShuffleReadBytes(app.getShuffleReadBytes() + stage.getShuffleReadBytes());
-    }
-
-    private void aggregateToStage(SparkTask task) {
-        String stageId = task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
-        String stageAttemptId = task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
-        String key = this.generateStageKey(stageId, stageAttemptId);
-        SparkStage stage = stages.get(key);
-
-        stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + task.getDiskBytesSpilled());
-        stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + task.getMemoryBytesSpilled());
-        stage.setExecutorRunTime(stage.getExecutorRunTime() + task.getExecutorRunTime());
-        stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
-        stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + task.getExecutorDeserializeTime());
-        stage.setResultSerializationTime(stage.getResultSerializationTime() + task.getResultSerializationTime());
-        stage.setResultSize(stage.getResultSize() + task.getResultSize());
-        stage.setInputRecords(stage.getInputRecords() + task.getInputRecords());
-        stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
-        stage.setOutputRecords(stage.getOutputRecords() + task.getOutputRecords());
-        stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
-        stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + task.getShuffleWriteRecords());
-        stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + task.getShuffleWriteBytes());
-        stage.setShuffleReadRecords(stage.getShuffleReadRecords() + task.getShuffleReadRecords());
-        long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
-        stage.setShuffleReadBytes(stage.getShuffleReadBytes() + taskShuffleReadBytes);
-
-        boolean success = !task.isFailed();
-
-        Integer taskIndex = Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
-        if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
-            //has previous task attempt, retrieved from task index in one stage
-            boolean previousResult = stageTaskStatusMap.get(key).get(taskIndex);
-            success = previousResult || success;
-            if (previousResult != success) {
-                stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
-                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
-                stageTaskStatusMap.get(key).put(taskIndex, success);
-            }
-        } else {
-            if (success) {
-                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
-            } else {
-                stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
-            }
-            stageTaskStatusMap.get(key).put(taskIndex, success);
-        }
-
-    }
-
-    private void aggregateToExecutor(SparkTask task) {
-        String executorId = task.getExecutorId();
-        SparkExecutor executor = executors.get(executorId);
-
-        if (null != executor) {
-            executor.setTotalTasks(executor.getTotalTasks() + 1);
-            if (task.isFailed()) {
-                executor.setFailedTasks(executor.getFailedTasks() + 1);
-            } else {
-                executor.setCompletedTasks(executor.getCompletedTasks() + 1);
-            }
-            long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + task.getShuffleReadRemoteBytes();
-            executor.setTotalShuffleRead(executor.getTotalShuffleRead() + taskShuffleReadBytes);
-            executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
-            executor.setTotalInputBytes(executor.getTotalInputBytes() + task.getInputBytes());
-            executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + task.getShuffleWriteBytes());
-            executor.setTotalDuration(executor.getTotalDuration() + task.getExecutorRunTime());
-        }
-
-    }
-
-    private void aggregateToJob(SparkStage stage) {
-        int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
-        SparkJob job = jobs.get(jobId);
-        job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks());
-        job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks());
-        job.setNumTask(job.getNumTask() + stage.getNumTasks());
-
-
-        if (stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
-            //if multiple attempts succeed, just count one
-            if (!hasStagePriorAttemptSuccess(stage)) {
-                job.setNumCompletedStages(job.getNumCompletedStages() + 1);
-            }
-        } else {
-            job.setNumFailedStages(job.getNumFailedStages() + 1);
-        }
-    }
-
-    private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
-        int stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
-        for (int i = 0; i < stageAttemptId; i++) {
-            SparkStage previousStage = stages.get(this.generateStageKey(
-                    stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), Integer.toString(i)));
-            if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-
-    private String generateStageKey(String stageId, String stageAttemptId) {
-        return stageId + "-" + stageAttemptId;
-    }
-
-    private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) {
-        SparkStage stage = new SparkStage();
-        stage.setTags(new HashMap<>(this.app.getTags()));
-        stage.setTimestamp(app.getTimestamp());
-        stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
-        stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Integer.toString(stageId));
-        stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Integer.toString(stageAttemptId));
-        stage.setName(name);
-        stage.setNumActiveTasks(0);
-        stage.setNumTasks(numTasks);
-        stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ?
-                "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool"));
-
-        String stageKey = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
-        stages.put(stageKey, stage);
-        this.jobStageMap.get(jobId).add(stageKey);
-    }
-
-
-    private SparkExecutor initiateExecutor(String executorID, long startTime) {
-        if (!executors.containsKey(executorID)) {
-            SparkExecutor executor = new SparkExecutor();
-            executor.setTags(new HashMap<>(this.app.getTags()));
-            executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID);
-            executor.setStartTime(startTime);
-            executor.setTimestamp(app.getTimestamp());
-
-            this.executors.put(executorID, executor);
-        }
-
-        return this.executors.get(executorID);
-    }
-
-    private String getNormalizedName(String jobName, String assignedName) {
-        if (null != assignedName) {
-            return assignedName;
-        } else {
-            return JobNameNormalization.getInstance().normalize(jobName);
-        }
-    }
-
-    private void flushEntities(Object entity, boolean forceFlush) {
-        this.flushEntities(Collections.singletonList(entity), forceFlush);
-    }
-
-    private void flushEntities(Collection entities, boolean forceFlush) {
-        this.createEntities.addAll(entities);
-
-        if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
-            try {
-                this.doFlush(this.createEntities);
-                this.createEntities.clear();
-            } catch (Exception e) {
-                LOG.error("Fail to flush entities", e);
-            }
-
-        }
-    }
-
-    private EagleServiceBaseClient initiateClient() {
-        String host = conf.getString("eagleProps.eagle.service.host");
-        int port = conf.getInt("eagleProps.eagle.service.port");
-        String userName = conf.getString("eagleProps.eagle.service.username");
-        String pwd = conf.getString("eagleProps.eagle.service.password");
-        client = new EagleServiceClientImpl(host, port, userName, pwd);
-        int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
-        client.getJerseyClient().setReadTimeout(timeout * 1000);
-
-        return client;
-    }
-
-    private void doFlush(List entities) throws IOException, EagleServiceClientException {
-        client.create(entities);
-        int size = (entities == null ? 0 : entities.size());
-        LOG.info("finish flushing entities of total number " + size);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
deleted file mode 100644
index 02fc5cf..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-public class JHFSparkParser implements JHFParserBase {
-
-    private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class);
-
-    private boolean isValidJson;
-
-    private JHFSparkEventReader eventReader;
-
-    public JHFSparkParser(JHFSparkEventReader reader) {
-        this.eventReader = reader;
-    }
-
-    @Override
-    public void parse(InputStream is) throws Exception {
-        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
-            for (String line = reader.readLine(); line != null; line = reader.readLine()) {
-                isValidJson = true;
-                JSONObject eventObj = parseAndValidateJSON(line);
-                if (isValidJson) {
-                    try {
-                        this.eventReader.read(eventObj);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-
-            this.eventReader.clearReader();
-        }
-    }
-
-    private JSONObject parseAndValidateJSON(String line) {
-        JSONObject eventObj = null;
-        JSONParser parser = new JSONParser();
-        try {
-            eventObj = (JSONObject) parser.parse(line);
-        } catch (ParseException ex) {
-            isValidJson = false;
-            logger.error(String.format("Invalid json string. Fail to parse %s.", line), ex);
-        }
-        return eventObj;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
deleted file mode 100644
index 423d045..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-public class SparkApplicationInfo {
-
-    private String state;
-    private String finalStatus;
-    private String queue;
-    private String name;
-    private String user;
-
-    public String getState() {
-        return state;
-    }
-
-    public void setState(String state) {
-        this.state = state;
-    }
-
-    public String getFinalStatus() {
-        return finalStatus;
-    }
-
-    public void setFinalStatus(String finalStatus) {
-        this.finalStatus = finalStatus;
-    }
-
-    public String getQueue() {
-        return queue;
-    }
-
-    public void setQueue(String queue) {
-        this.queue = queue;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getUser() {
-        return user;
-    }
-
-    public void setUser(String user) {
-        this.user = user;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
deleted file mode 100644
index 3964454..0000000
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.crawl;
-
-import org.apache.eagle.jpm.util.SparkJobTagName;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReader {
-
-    private String site;
-    private SparkApplicationInfo app;
-
-
-    public SparkFilesystemInputStreamReaderImpl(String site, SparkApplicationInfo app) {
-        this.site = site;
-        this.app = app;
-    }
-
-    @Override
-    public void read(InputStream is) throws Exception {
-        Map<String, String> baseTags = new HashMap<>();
-        baseTags.put(SparkJobTagName.SITE.toString(), site);
-        baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
-        JHFParserBase parser = new JHFSparkParser(new JHFSparkEventReader(baseTags, this.app));
-        parser.parse(is);
-    }
-
-    public static void main(String[] args) throws Exception {
-        SparkFilesystemInputStreamReaderImpl impl = new SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
-        impl.read(new FileInputStream(new File("E:\\eagle\\application_1459803563374_535667_1")));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
new file mode 100644
index 0000000..81f266b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
@@ -0,0 +1,33 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.mr.runningentity.JobConfigSerDeser;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class JPMEntityRepository extends EntityRepository {
+    public JPMEntityRepository() {
+        entitySet.add(SparkAppEntity.class);
+        entitySet.add(SparkJobEntity.class);
+        entitySet.add(SparkStageEntity.class);
+        entitySet.add(SparkTaskEntity.class);
+        entitySet.add(SparkExecutorEntity.class);
+        serDeserMap.put(JobConfig.class, new JobConfigSerDeser());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
new file mode 100644
index 0000000..0d3a86f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
@@ -0,0 +1,26 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+public class JobConfig extends HashMap<String, String> implements Serializable {
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
new file mode 100644
index 0000000..51c8a50
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
@@ -0,0 +1,476 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningApps")
+@ColumnFamily("f")
+@Prefix("sparkApp")
+@Service(Constants.RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "user", "queue"})
+@Partition({"site"})
+public class SparkAppEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long  startTime;
+    @Column("b")
+    private long endTime;
+    @Column("c")
+    private String yarnState;
+    @Column("d")
+    private String yarnStatus;
+    @Column("e")
+    private JobConfig config;
+    @Column("f")
+    private int numJobs;
+    @Column("g")
+    private int totalStages;
+    @Column("h")
+    private int skippedStages;
+    @Column("i")
+    private int failedStages;
+    @Column("j")
+    private int totalTasks;
+    @Column("k")
+    private int skippedTasks;
+    @Column("l")
+    private int failedTasks;
+    @Column("m")
+    private int executors;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private long executorDeserializeTime;
+    @Column("w")
+    private long executorRunTime;
+    @Column("x")
+    private long resultSize;
+    @Column("y")
+    private long jvmGcTime;
+    @Column("z")
+    private long resultSerializationTime;
+    @Column("ab")
+    private long memoryBytesSpilled;
+    @Column("ac")
+    private long diskBytesSpilled;
+    @Column("ad")
+    private long execMemoryBytes;
+    @Column("ae")
+    private long driveMemoryBytes;
+    @Column("af")
+    private int completeTasks;
+    @Column("ag")
+    private long totalExecutorTime;
+    @Column("ah")
+    private long executorMemoryOverhead;
+    @Column("ai")
+    private long driverMemoryOverhead;
+    @Column("aj")
+    private int executorCores;
+    @Column("ak")
+    private int driverCores;
+    @Column("al")
+    private AppInfo appInfo;
+    @Column("am")
+    private int activeStages;
+    @Column("an")
+    private int completeStages;
+    @Column("ba")
+    private int activeTasks;
+
+    public int getActiveTasks() {
+        return activeTasks;
+    }
+
+    public void setActiveTasks(int activeTasks) {
+        this.activeTasks = activeTasks;
+        valueChanged("activeTasks");
+    }
+
+    public int getCompleteStages() {
+        return completeStages;
+    }
+
+    public void setCompleteStages(int completeStages) {
+        this.completeStages = completeStages;
+        valueChanged("completeStages");
+    }
+
+    public int getActiveStages() {
+        return activeStages;
+    }
+
+    public void setActiveStages(int activeStages) {
+        this.activeStages = activeStages;
+        valueChanged("activeStages");
+    }
+
+    public AppInfo getAppInfo() {
+        return appInfo;
+    }
+
+    public void setAppInfo(AppInfo appInfo) {
+        this.appInfo = appInfo;
+        valueChanged("appInfo");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public String getYarnState() {
+        return yarnState;
+    }
+
+    public String getYarnStatus() {
+        return yarnStatus;
+    }
+
+    public int getNumJobs() {
+        return numJobs;
+    }
+
+    public int getTotalStages() {
+        return totalStages;
+    }
+
+    public int getSkippedStages() {
+        return skippedStages;
+    }
+
+    public int getFailedStages() {
+        return failedStages;
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public int getSkippedTasks() {
+        return skippedTasks;
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public int getExecutors() {
+        return executors;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public long getDriveMemoryBytes() {
+        return driveMemoryBytes;
+    }
+
+    public int getCompleteTasks() {
+        return completeTasks;
+    }
+
+    public JobConfig getConfig() {
+        return config;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        valueChanged("endTime");
+    }
+
+    public void setYarnState(String yarnState) {
+        this.yarnState = yarnState;
+        valueChanged("yarnState");
+    }
+
+    public void setYarnStatus(String yarnStatus) {
+        this.yarnStatus = yarnStatus;
+        valueChanged("yarnStatus");
+    }
+
+    public void setConfig(JobConfig config) {
+        this.config = config;
+        valueChanged("config");
+    }
+
+    public void setNumJobs(int numJobs) {
+        this.numJobs = numJobs;
+        valueChanged("numJobs");
+    }
+
+    public void setTotalStages(int totalStages) {
+        this.totalStages = totalStages;
+        valueChanged("totalStages");
+    }
+
+    public void setSkippedStages(int skippedStages) {
+        this.skippedStages = skippedStages;
+        valueChanged("skippedStages");
+    }
+
+    public void setFailedStages(int failedStages) {
+        this.failedStages = failedStages;
+        valueChanged("failedStages");
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        valueChanged("totalTasks");
+    }
+
+    public void setSkippedTasks(int skippedTasks) {
+        this.skippedTasks = skippedTasks;
+        valueChanged("skippedTasks");
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        valueChanged("failedTasks");
+    }
+
+    public void setExecutors(int executors) {
+        this.executors = executors;
+        valueChanged("executors");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadBytes = shuffleReadRemoteBytes;
+        valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        valueChanged("shuffleWriteRecords");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        valueChanged("diskBytesSpilled");
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        valueChanged("execMemoryBytes");
+    }
+
+    public void setDriveMemoryBytes(long driveMemoryBytes) {
+        this.driveMemoryBytes = driveMemoryBytes;
+        valueChanged("driveMemoryBytes");
+    }
+
+    public void setCompleteTasks(int completeTasks) {
+        this.completeTasks = completeTasks;
+        valueChanged("completeTasks");
+    }
+
+    public long getTotalExecutorTime() {
+        return totalExecutorTime;
+    }
+
+    public void setTotalExecutorTime(long totalExecutorTime) {
+        this.totalExecutorTime = totalExecutorTime;
+        valueChanged("totalExecutorTime");
+    }
+
+    public long getExecutorMemoryOverhead() {
+        return executorMemoryOverhead;
+    }
+
+    public void setExecutorMemoryOverhead(long executorMemoryOverhead) {
+        this.executorMemoryOverhead = executorMemoryOverhead;
+        valueChanged("executorMemoryOverhead");
+    }
+
+    public long getDriverMemoryOverhead() {
+        return driverMemoryOverhead;
+    }
+
+    public void setDriverMemoryOverhead(long driverMemoryOverhead) {
+        this.driverMemoryOverhead = driverMemoryOverhead;
+        valueChanged("driverMemoryOverhead");
+    }
+
+    public int getExecutorCores() {
+        return executorCores;
+    }
+
+    public void setExecutorCores(int executorCores) {
+        this.executorCores = executorCores;
+        valueChanged("executorCores");
+    }
+
+    public int getDriverCores() {
+        return driverCores;
+    }
+
+    public void setDriverCores(int driverCores) {
+        this.driverCores = driverCores;
+        valueChanged("driverCores");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
new file mode 100644
index 0000000..6d0441c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
@@ -0,0 +1,233 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningExecutors")
+@ColumnFamily("f")
+@Prefix("sparkExecutor")
+@Service(Constants.RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "executorId","user", "queue"})
+@Partition({"site"})
+public class SparkExecutorEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String hostPort;
+    @Column("b")
+    private int rddBlocks;
+    @Column("c")
+    private long memoryUsed;
+    @Column("d")
+    private long diskUsed;
+    @Column("e")
+    private int activeTasks = 0;
+    @Column("f")
+    private int failedTasks = 0;
+    @Column("g")
+    private int completedTasks = 0;
+    @Column("h")
+    private int totalTasks = 0;
+    @Column("i")
+    private long totalDuration = 0;
+    @Column("j")
+    private long totalInputBytes = 0;
+    @Column("k")
+    private long totalShuffleRead = 0;
+    @Column("l")
+    private long totalShuffleWrite = 0;
+    @Column("m")
+    private long maxMemory;
+    @Column("n")
+    private long startTime;
+    @Column("o")
+    private long endTime = 0;
+    @Column("p")
+    private long execMemoryBytes;
+    @Column("q")
+    private int cores;
+    @Column("r")
+    private long memoryOverhead;
+
+    public String getHostPort() {
+        return hostPort;
+    }
+
+    public void setHostPort(String hostPort) {
+        this.hostPort = hostPort;
+        this.valueChanged("hostPort");
+    }
+
+    public int getRddBlocks() {
+        return rddBlocks;
+    }
+
+    public void setRddBlocks(int rddBlocks) {
+        this.rddBlocks = rddBlocks;
+        this.valueChanged("rddBlocks");
+    }
+
+    public long getMemoryUsed() {
+        return memoryUsed;
+    }
+
+    public void setMemoryUsed(long memoryUsed) {
+        this.memoryUsed = memoryUsed;
+        this.valueChanged("memoryUsed");
+    }
+
+    public long getDiskUsed() {
+        return diskUsed;
+    }
+
+    public void setDiskUsed(long diskUsed) {
+        this.diskUsed = diskUsed;
+        this.valueChanged("diskUsed");
+    }
+
+    public int getActiveTasks() {
+        return activeTasks;
+    }
+
+    public void setActiveTasks(int activeTasks) {
+        this.activeTasks = activeTasks;
+        this.valueChanged("activeTasks");
+    }
+
+    public int getFailedTasks() {
+        return failedTasks;
+    }
+
+    public void setFailedTasks(int failedTasks) {
+        this.failedTasks = failedTasks;
+        this.valueChanged("failedTasks");
+    }
+
+    public int getCompletedTasks() {
+        return completedTasks;
+    }
+
+    public void setCompletedTasks(int completedTasks) {
+        this.completedTasks = completedTasks;
+        this.valueChanged("completedTasks");
+    }
+
+    public int getTotalTasks() {
+        return totalTasks;
+    }
+
+    public void setTotalTasks(int totalTasks) {
+        this.totalTasks = totalTasks;
+        this.valueChanged("totalTasks");
+    }
+
+    public long getTotalDuration() {
+        return totalDuration;
+    }
+
+    public void setTotalDuration(long totalDuration) {
+        this.totalDuration = totalDuration;
+        this.valueChanged("totalDuration");
+    }
+
+    public long getTotalInputBytes() {
+        return totalInputBytes;
+    }
+
+    public void setTotalInputBytes(long totalInputBytes) {
+        this.totalInputBytes = totalInputBytes;
+        this.valueChanged("totalInputBytes");
+    }
+
+    public long getTotalShuffleRead() {
+        return totalShuffleRead;
+    }
+
+    public void setTotalShuffleRead(long totalShuffleRead) {
+        this.totalShuffleRead = totalShuffleRead;
+        this.valueChanged("totalShuffleRead");
+    }
+
+    public long getTotalShuffleWrite() {
+        return totalShuffleWrite;
+    }
+
+    public void setTotalShuffleWrite(long totalShuffleWrite) {
+        this.totalShuffleWrite = totalShuffleWrite;
+        this.valueChanged("totalShuffleWrite");
+    }
+
+    public long getMaxMemory() {
+        return maxMemory;
+    }
+
+    public void setMaxMemory(long maxMemory) {
+        this.maxMemory = maxMemory;
+        this.valueChanged("maxMemory");
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+        valueChanged("startTime");
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+        this.valueChanged("endTime");
+    }
+
+    public long getExecMemoryBytes() {
+        return execMemoryBytes;
+    }
+
+    public void setExecMemoryBytes(long execMemoryBytes) {
+        this.execMemoryBytes = execMemoryBytes;
+        this.valueChanged("execMemoryBytes");
+    }
+
+    public int getCores() {
+        return cores;
+    }
+
+    public void setCores(int cores) {
+        this.cores = cores;
+        valueChanged("cores");
+    }
+
+    public long getMemoryOverhead() {
+        return memoryOverhead;
+    }
+
+    public void setMemoryOverhead(long memoryOverhead) {
+        this.memoryOverhead = memoryOverhead;
+        valueChanged("memoryOverhead");
+    }
+}