You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:08 UTC

[12/52] [abbrv] incubator-eagle git commit: EAGLE-492 Fix negative total execution time and make some code optimization for spark history job. 1) negative total execution time It is because the app was killed or terminated unexpectedly without completion

EAGLE-492 Fix negative total execution time and make some code optimization for spark history job.
1) negative total execution time
It is because the app was killed or terminated unexpectedly without completion. In the log file, it has only SparkListenerBlockManagerAdded timestamp. "Timestamp":1470768362299
And the code initiated executor starttime with this timestamp.
No executor or app endtime available, so executor endtime = 0L.
Finally, totalExecutionTime = -1470768362299
The code logic does not consider the situation when executor or app Endtime = 0L.
Solution: If totalExecutionTime < 0L, set it = 0L. Stating the app is killed and not completed.
2) Url builder bug for completed jobs

https://issues.apache.org/jira/browse/EAGLE-492

Author: @pkuwm <ih...@gmail.com>
Reviewer: @DadanielZ <da...@apache.org>

Closes #375


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/6f5f972c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/6f5f972c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/6f5f972c

Branch: refs/heads/master
Commit: 6f5f972c966ea1edc2adf0573dbdb1b0d5ff9671
Parents: 24e6622
Author: DadanielZ <da...@apache.org>
Authored: Wed Aug 24 17:19:22 2016 -0700
Committer: DadanielZ <da...@apache.org>
Committed: Wed Aug 24 17:19:22 2016 -0700

----------------------------------------------------------------------
 .../jpm/spark/crawl/JHFSparkEventReader.java    | 174 +++++++++----------
 .../eagle/jpm/spark/crawl/JHFSparkParser.java   |  45 +++--
 .../history/storm/FinishedSparkJobSpout.java    |  12 +-
 .../spark/history/storm/SparkJobParseBolt.java  |  36 ++--
 .../java/org/apache/eagle/jpm/util/Utils.java   |   5 +-
 .../util/resourceFetch/RMResourceFetcher.java   |  41 ++---
 .../resourceFetch/ha/HAURLSelectorImpl.java     |   4 +-
 .../SparkCompleteJobServiceURLBuilderImpl.java  |  11 +-
 8 files changed, 163 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f5f972c/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index e298fa3..a5e630a 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -18,10 +18,7 @@
 package org.apache.eagle.jpm.spark.crawl;
 
 import org.apache.eagle.jpm.spark.entity.*;
-import org.apache.eagle.jpm.util.JSONUtil;
-import org.apache.eagle.jpm.util.JobNameNormalization;
-import org.apache.eagle.jpm.util.SparkEntityConstant;
-import org.apache.eagle.jpm.util.SparkJobTagName;
+import org.apache.eagle.jpm.util.*;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -37,8 +34,9 @@ import java.util.*;
 public class JHFSparkEventReader {
     private static final Logger LOG = LoggerFactory.getLogger(JHFSparkEventReader.class);
 
-    public static final int FLUSH_LIMIT = 500;
+    private static final int FLUSH_LIMIT = 500;
     private long firstTaskLaunchTime;
+    private long lastEventTime;
 
     private Map<String, SparkExecutor> executors;
     private SparkApp app;
@@ -75,7 +73,6 @@ public class JHFSparkEventReader {
 
     public void read(JSONObject eventObj) throws Exception {
         String eventType = (String) eventObj.get("Event");
-        LOG.info("Event type: " + eventType);
         if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) {
             handleAppStarted(eventObj);
         } else if (eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString())) {
@@ -108,7 +105,6 @@ public class JHFSparkEventReader {
 
     }
 
-
     private void handleEnvironmentSet(JSONObject event) {
         app.setConfig(new JobConfig());
         JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
@@ -142,16 +138,10 @@ public class JHFSparkEventReader {
         }
     }
 
-
     private boolean isClientMode(JobConfig config) {
-        if (config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client")) {
-            return true;
-        } else {
-            return false;
-        }
+        return config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client");
     }
 
-
     private void handleAppStarted(JSONObject event) {
         //need update all entities tag before app start
         List<TaggedLogAPIEntity> entities = new ArrayList<TaggedLogAPIEntity>();
@@ -174,11 +164,14 @@ public class JHFSparkEventReader {
         }
 
         this.app.setStartTime(appStartTime);
+        this.lastEventTime = appStartTime;
     }
 
     private void handleExecutorAdd(JSONObject event) throws Exception {
         String executorID = (String) event.get("Executor ID");
-        SparkExecutor executor = this.initiateExecutor(executorID, JSONUtil.getLong(event, "Timestamp"));
+        long executorAddTime = JSONUtil.getLong(event, "Timestamp");
+        this.lastEventTime = executorAddTime;
+        SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime);
 
         JSONObject executorInfo = JSONUtil.getJSONObject(event, "Executor Info");
 
@@ -187,6 +180,7 @@ public class JHFSparkEventReader {
     private void handleBlockManagerAdd(JSONObject event) throws Exception {
         long maxMemory = JSONUtil.getLong(event, "Maximum Memory");
         long timestamp = JSONUtil.getLong(event, "Timestamp");
+        this.lastEventTime = timestamp;
         JSONObject blockInfo = JSONUtil.getJSONObject(event, "Block Manager ID");
         String executorID = JSONUtil.getString(blockInfo, "Executor ID");
         String hostport = String.format("%s:%s", JSONUtil.getString(blockInfo, "Host"), JSONUtil.getLong(blockInfo, "Port"));
@@ -202,11 +196,9 @@ public class JHFSparkEventReader {
 
     private void handleTaskEnd(JSONObject event) {
         JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
-        Integer taskId = JSONUtil.getInt(taskInfo, "Task ID");
-        SparkTask task = null;
-        if (tasks.containsKey(taskId)) {
-            task = tasks.get(taskId);
-        } else {
+        int taskId = JSONUtil.getInt(taskInfo, "Task ID");
+        SparkTask task = tasks.get(taskId);
+        if (task == null) {
             return;
         }
 
@@ -261,19 +253,20 @@ public class JHFSparkEventReader {
 
     private SparkTask initializeTask(JSONObject event) {
         SparkTask task = new SparkTask();
-        task.setTags(new HashMap(this.app.getTags()));
+        task.setTags(new HashMap<>(this.app.getTags()));
         task.setTimestamp(app.getTimestamp());
 
-        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), JSONUtil.getLong(event, "Stage ID").toString());
-        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), JSONUtil.getLong(event, "Stage Attempt ID").toString());
+        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtil.getLong(event, "Stage ID")));
+        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtil.getLong(event, "Stage Attempt ID")));
 
         JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
         int taskId = JSONUtil.getInt(taskInfo, "Task ID");
         task.setTaskId(taskId);
 
-        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), JSONUtil.getInt(taskInfo, "Index").toString());
-        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), JSONUtil.getInt(taskInfo, "Attempt").toString());
+        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Integer.toString(JSONUtil.getInt(taskInfo, "Index")));
+        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtil.getInt(taskInfo, "Attempt")));
         long launchTime = JSONUtil.getLong(taskInfo, "Launch Time");
+        this.lastEventTime = launchTime;
         if (taskId == 0) {
             this.setFirstTaskLaunchTime(launchTime);
         }
@@ -295,15 +288,16 @@ public class JHFSparkEventReader {
         return this.firstTaskLaunchTime;
     }
 
-
     private void handleJobStart(JSONObject event) {
         SparkJob job = new SparkJob();
-        job.setTags(new HashMap(this.app.getTags()));
+        job.setTags(new HashMap<>(this.app.getTags()));
         job.setTimestamp(app.getTimestamp());
 
-        Integer jobId = JSONUtil.getInt(event, "Job ID");
-        job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString());
-        job.setSubmissionTime(JSONUtil.getLong(event, "Submission Time"));
+        int jobId = JSONUtil.getInt(event, "Job ID");
+        job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
+        long submissionTime = JSONUtil.getLong(event, "Submission Time");
+        job.setSubmissionTime(submissionTime);
+        this.lastEventTime = submissionTime;
 
         //for complete application, no active stages/tasks
         job.setNumActiveStages(0);
@@ -316,8 +310,8 @@ public class JHFSparkEventReader {
         job.setNumStages(stages.size());
         for (int i = 0; i < stages.size(); i++) {
             JSONObject stageInfo = (JSONObject) stages.get(i);
-            Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
-            Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
+            int stageId = JSONUtil.getInt(stageInfo, "Stage ID");
+            int stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
             String stageName = JSONUtil.getString(stageInfo, "Stage Name");
             int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks");
             this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks);
@@ -326,14 +320,14 @@ public class JHFSparkEventReader {
 
     private void handleStageSubmit(JSONObject event) {
         JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
-        Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
-        Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
-        String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString());
+        int stageId = JSONUtil.getInt(stageInfo, "Stage ID");
+        int stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
+        String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
         stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
 
-        if (!stages.containsKey(this.generateStageKey(stageId.toString(), stageAttemptId.toString()))) {
+        if (!stages.containsKey(this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId)))) {
             //may be further attempt for one stage
-            String baseAttempt = this.generateStageKey(stageId.toString(), "0");
+            String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0");
             if (stages.containsKey(baseAttempt)) {
                 SparkStage stage = stages.get(baseAttempt);
                 String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
@@ -343,14 +337,13 @@ public class JHFSparkEventReader {
                 this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks);
             }
         }
-
     }
 
     private void handleStageComplete(JSONObject event) {
         JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
-        Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
-        Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
-        String key = this.generateStageKey(stageId.toString(), stageAttemptId.toString());
+        int stageId = JSONUtil.getInt(stageInfo, "Stage ID");
+        int stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
+        String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
         SparkStage stage = stages.get(key);
 
         // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0.
@@ -359,7 +352,10 @@ public class JHFSparkEventReader {
             submissionTime = this.getFirstTaskLaunchTime();
         }
         stage.setSubmitTime(submissionTime);
-        stage.setCompleteTime(JSONUtil.getLong(stageInfo, "Completion Time"));
+
+        long completeTime = JSONUtil.getLong(stageInfo, "Completion Time");
+        stage.setCompleteTime(completeTime);
+        this.lastEventTime = completeTime;
 
         if (stageInfo.containsKey("Failure Reason")) {
             stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
@@ -371,14 +367,19 @@ public class JHFSparkEventReader {
     private void handleExecutorRemoved(JSONObject event) {
         String executorID = JSONUtil.getString(event, "Executor ID");
         SparkExecutor executor = executors.get(executorID);
-        executor.setEndTime(JSONUtil.getLong(event, "Timestamp"));
-
+        long removedTime = JSONUtil.getLong(event, "Timestamp");
+        executor.setEndTime(removedTime);
+        this.lastEventTime = removedTime;
     }
 
     private void handleJobEnd(JSONObject event) {
-        Integer jobId = JSONUtil.getInt(event, "Job ID");
+        int jobId = JSONUtil.getInt(event, "Job ID");
         SparkJob job = jobs.get(jobId);
-        job.setCompletionTime(JSONUtil.getLong(event, "Completion Time"));
+
+        long completionTime = JSONUtil.getLong(event, "Completion Time");
+        job.setCompletionTime(completionTime);
+        this.lastEventTime = completionTime;
+
         JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result");
         String result = JSONUtil.getString(jobResult, "Result");
         if (result.equalsIgnoreCase("JobSucceeded")) {
@@ -391,6 +392,7 @@ public class JHFSparkEventReader {
     private void handleAppEnd(JSONObject event) {
         long endTime = JSONUtil.getLong(event, "Timestamp");
         app.setEndTime(endTime);
+        this.lastEventTime = endTime;
     }
 
     public void clearReader() throws Exception {
@@ -405,7 +407,7 @@ public class JHFSparkEventReader {
 
         List<SparkStage> needStoreStages = new ArrayList<>();
         for (SparkStage stage : this.stages.values()) {
-            Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+            int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
             if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
                 SparkJob job = this.jobs.get(jobId);
                 job.setNumSkippedStages(job.getNumSkippedStages() + 1);
@@ -427,8 +429,9 @@ public class JHFSparkEventReader {
         this.flushEntities(jobs.values(), false);
 
         app.setExecutors(executors.values().size());
-        long executorMemory = parseExecutorMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
-        long driverMemory = parseExecutorMemory(this.isClientMode(app.getConfig())
+
+        long executorMemory = Utils.parseMemory((String) this.getConfigVal(this.app.getConfig(), "spark.executor.memory", String.class.getName()));
+        long driverMemory = Utils.parseMemory(this.isClientMode(app.getConfig())
             ? (String) this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", String.class.getName())
             : (String) this.getConfigVal(app.getConfig(), "spark.driver.memory", String.class.getName()));
 
@@ -460,7 +463,10 @@ public class JHFSparkEventReader {
                 executor.setCores(executorCore);
                 executor.setMemoryOverhead(executorMemoryOverhead);
             }
-            if (executor.getEndTime() == 0) {
+            if (app.getEndTime() <= 0L) {
+                app.setEndTime(this.lastEventTime);
+            }
+            if (executor.getEndTime() <= 0L) {
                 executor.setEndTime(app.getEndTime());
             }
             this.aggregateExecutorToApp(executor);
@@ -473,21 +479,28 @@ public class JHFSparkEventReader {
 
     private long getMemoryOverhead(JobConfig config, long executorMemory, String fieldName) {
         long result = 0L;
-        if (config.getConfig().containsKey(fieldName)) {
-            result = this.parseExecutorMemory(config.getConfig().get(fieldName) + "m");
+        String fieldValue = config.getConfig().get(fieldName);
+        if (fieldValue != null) {
+            result = Utils.parseMemory(fieldValue + "m");
             if (result == 0L) {
-                result = this.parseExecutorMemory(config.getConfig().get(fieldName));
+               result = Utils.parseMemory(fieldValue);
             }
         }
 
         if (result == 0L) {
-            result = Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")), executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
+            result = Math.max(
+                    Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
+                    executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 100);
         }
         return result;
     }
 
     private void aggregateExecutorToApp(SparkExecutor executor) {
-        app.setTotalExecutorTime(app.getTotalExecutorTime() + (executor.getEndTime() - executor.getStartTime()));
+        long totalExecutorTime = app.getTotalExecutorTime() + executor.getEndTime() - executor.getStartTime();
+        if (totalExecutorTime < 0L) {
+            totalExecutorTime = 0L;
+        }
+        app.setTotalExecutorTime(totalExecutorTime);
     }
 
     private void aggregateJobToApp(SparkJob job) {
@@ -589,7 +602,7 @@ public class JHFSparkEventReader {
     }
 
     private void aggregateToJob(SparkStage stage) {
-        Integer jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+        int jobId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
         SparkJob job = jobs.get(jobId);
         job.setNumCompletedTasks(job.getNumCompletedTasks() + stage.getNumCompletedTasks());
         job.setNumFailedTasks(job.getNumFailedTasks() + stage.getNumFailedTasks());
@@ -601,16 +614,16 @@ public class JHFSparkEventReader {
             if (!hasStagePriorAttemptSuccess(stage)) {
                 job.setNumCompletedStages(job.getNumCompletedStages() + 1);
             }
-
         } else {
             job.setNumFailedStages(job.getNumFailedStages() + 1);
         }
     }
 
     private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
-        Integer stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
-        for (Integer i = 0; i < stageAttemptId; i++) {
-            SparkStage previousStage = stages.get(this.generateStageKey(stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), i.toString()));
+        int stageAttemptId = Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
+        for (int i = 0; i < stageAttemptId; i++) {
+            SparkStage previousStage = stages.get(this.generateStageKey(
+                    stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), Integer.toString(i)));
             if (previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString())) {
                 return true;
             }
@@ -623,19 +636,20 @@ public class JHFSparkEventReader {
         return String.format("%s-%s", stageId, stageAttemptId);
     }
 
-    private void initiateStage(Integer jobId, Integer stageId, Integer stageAttemptId, String name, int numTasks) {
+    private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) {
         SparkStage stage = new SparkStage();
-        stage.setTags(new HashMap(this.app.getTags()));
+        stage.setTags(new HashMap<>(this.app.getTags()));
         stage.setTimestamp(app.getTimestamp());
-        stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), jobId.toString());
-        stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), stageId.toString());
-        stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), stageAttemptId.toString());
+        stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
+        stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Integer.toString(stageId));
+        stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Integer.toString(stageAttemptId));
         stage.setName(name);
         stage.setNumActiveTasks(0);
         stage.setNumTasks(numTasks);
-        stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ? "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool"));
+        stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool") == null ?
+                "default" : this.app.getConfig().getConfig().get("spark.scheduler.pool"));
 
-        String stageKey = this.generateStageKey(stageId.toString(), stageAttemptId.toString());
+        String stageKey = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
         stages.put(stageKey, stage);
         this.jobStageMap.get(jobId).add(stageKey);
     }
@@ -644,7 +658,7 @@ public class JHFSparkEventReader {
     private SparkExecutor initiateExecutor(String executorID, long startTime) throws Exception {
         if (!executors.containsKey(executorID)) {
             SparkExecutor executor = new SparkExecutor();
-            executor.setTags(new HashMap(this.app.getTags()));
+            executor.setTags(new HashMap<>(this.app.getTags()));
             executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), executorID);
             executor.setStartTime(startTime);
             executor.setTimestamp(app.getTimestamp());
@@ -663,28 +677,6 @@ public class JHFSparkEventReader {
         }
     }
 
-    private long parseExecutorMemory(String memory) {
-
-        if (memory.endsWith("g") || memory.endsWith("G")) {
-            int executorGB = Integer.parseInt(memory.substring(0, memory.length() - 1));
-            return 1024L * 1024 * 1024 * executorGB;
-        } else if (memory.endsWith("m") || memory.endsWith("M")) {
-            int executorMB = Integer.parseInt(memory.substring(0, memory.length() - 1));
-            return 1024L * 1024 * executorMB;
-        } else if (memory.endsWith("k") || memory.endsWith("K")) {
-            int executorKB = Integer.parseInt(memory.substring(0, memory.length() - 1));
-            return 1024L * executorKB;
-        } else if (memory.endsWith("t") || memory.endsWith("T")) {
-            int executorTB = Integer.parseInt(memory.substring(0, memory.length() - 1));
-            return 1024L * 1024 * 1024 * 1024 * executorTB;
-        } else if (memory.endsWith("p") || memory.endsWith("P")) {
-            int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
-            return 1024L * 1024 * 1024 * 1024 * 1024 * executorPB;
-        }
-        LOG.info("Cannot parse memory info " + memory);
-        return 0L;
-    }
-
     private void flushEntities(Object entity, boolean forceFlush) {
         this.flushEntities(Arrays.asList(entity), forceFlush);
     }
@@ -719,4 +711,4 @@ public class JHFSparkEventReader {
         LOG.info("start flushing entities of total number " + entities.size());
         LOG.info("finish flushing entities of total number " + entities.size());
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f5f972c/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
index da049ea..05cdd7e 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
@@ -20,6 +20,7 @@ package org.apache.eagle.jpm.spark.crawl;
 
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +32,9 @@ public class JHFSparkParser implements JHFParserBase {
 
     private static final Logger logger = LoggerFactory.getLogger(JHFSparkParser.class);
 
-    JHFSparkEventReader eventReader;
+    private boolean isValidJson;
+
+    private JHFSparkEventReader eventReader;
 
     public JHFSparkParser(JHFSparkEventReader reader) {
         this.eventReader = reader;
@@ -39,26 +42,32 @@ public class JHFSparkParser implements JHFParserBase {
 
     @Override
     public void parse(InputStream is) throws Exception {
-        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-        try {
-            String line;
-
-            JSONParser parser = new JSONParser();
-            while ((line = reader.readLine()) != null) {
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
+            for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+                isValidJson = true;
                 try {
-                    JSONObject eventObj = (JSONObject) parser.parse(line);
-                    String eventType = (String) eventObj.get("Event");
-                    logger.info("Event type: " + eventType);
-                    this.eventReader.read(eventObj);
-                } catch (Exception e) {
-                    logger.error(String.format("Invalid json string. Fail to parse %s.", line), e);
+                    JSONObject eventObj = parseAndValidateJSON(line);
+                    if (isValidJson) {
+                        this.eventReader.read(eventObj);
+                    }
+                } catch(Exception e) {
+                    logger.error(String.format("Fail to parse %s.", line), e);
                 }
             }
+
             this.eventReader.clearReader();
-        } finally {
-            if (reader != null) {
-                reader.close();
-            }
         }
     }
-}
+
+    private JSONObject parseAndValidateJSON(String line) {
+        JSONObject eventObj = null;
+        JSONParser parser = new JSONParser();
+        try {
+            eventObj = (JSONObject) parser.parse(line);
+        } catch (ParseException ex) {
+            isValidJson = false;
+            logger.error(String.format("Invalid json string. Fail to parse %s.", line), ex);
+        }
+        return eventObj;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f5f972c/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
index 8965d3d..bf04b55 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
@@ -76,11 +76,13 @@ public class FinishedSparkJobSpout extends BaseRichSpout {
             if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) {
                 List<AppInfo> appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, Long.toString(lastFinishAppTime));
                 //List<AppInfo> appInfos = (null != apps ? (List<AppInfo>)apps.get(0):new ArrayList<AppInfo>());
-                LOG.info("Get " + appInfos.size() + " from yarn resource manager.");
-                for (AppInfo app: appInfos) {
-                    String appId = app.getId();
-                    if (!zkState.hasApplication(appId)) {
-                        zkState.addFinishedApplication(appId, app.getQueue(), app.getState(), app.getFinalStatus(), app.getUser(), app.getName());
+                if (appInfos != null) {
+                    LOG.info("Get " + appInfos.size() + " from yarn resource manager.");
+                    for (AppInfo app : appInfos) {
+                        String appId = app.getId();
+                        if (!zkState.hasApplication(appId)) {
+                            zkState.addFinishedApplication(appId, app.getQueue(), app.getState(), app.getFinalStatus(), app.getUser(), app.getName());
+                        }
                     }
                 }
                 this.lastFinishAppTime = fetchTime;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f5f972c/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
index f00fa1b..c515d32 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
@@ -60,7 +60,7 @@ public class SparkJobParseBolt extends BaseRichBolt {
     @Override
     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
         this.collector = outputCollector;
-        this.hdfsConf  = new Configuration();
+        this.hdfsConf = new Configuration();
         this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint);
         this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
         this.hdfsConf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
@@ -73,17 +73,16 @@ public class SparkJobParseBolt extends BaseRichBolt {
     @Override
     public void execute(Tuple tuple) {
         String appId = tuple.getStringByField("appId");
-        FileSystem hdfs = null;
-        try {
-            if (!zkState.hasApplication(appId)) {
-                //may already be processed due to some reason
-                collector.ack(tuple);
-                return;
-            }
+        if (!zkState.hasApplication(appId)) {
+            //may already be processed due to some reason
+            collector.ack(tuple);
+            return;
+        }
 
+        try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) {
             SparkApplicationInfo info = zkState.getApplicationInfo(appId);
             //first try to get attempts under the application
-            hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
+
             Set<String> inprogressSet = new HashSet<String>();
             List<String> attemptLogNames = this.getAttemptLogNameList(appId, hdfs, inprogressSet);
 
@@ -111,14 +110,6 @@ public class SparkJobParseBolt extends BaseRichBolt {
             LOG.error("Fail to process application {}", appId, e);
             zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED);
             collector.fail(tuple);
-        } finally {
-            if (null != hdfs) {
-                try {
-                    hdfs.close();
-                } catch (Exception e) {
-                    LOG.error("Fail to close hdfs");
-                }
-            }
         }
     }
 
@@ -163,7 +154,18 @@ public class SparkJobParseBolt extends BaseRichBolt {
 
             boolean exists = true;
             while (exists) {
+                // For Yarn version 2.4.x
+                // log name: application_1464382345557_269065_1
                 String attemptIdString = Integer.toString(attemptId);
+
+                // For Yarn version >= 2.7,
+                // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
+//                String attemptIdFormatted = String.format("%06d", attemptId);
+//
+//                // remove "application_" to get the number part of appID.
+//                String sparkAppIdNum = appId.substring(12);
+//                String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
+
                 String appAttemptLogName = this.getAppAttemptLogName(appId, attemptIdString);
                 LOG.info("Attempt ID: {}, App Attempt Log: {}", attemptIdString, appAttemptLogName);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f5f972c/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 2696269..d738439 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -59,7 +59,7 @@ public class Utils {
         }
 
         if (timestamp == 0L) {
-            LOG.error("Not able to parse date: " + date);
+            LOG.warn("Not able to parse date: " + date);
         }
 
         return timestamp;
@@ -82,7 +82,8 @@ public class Utils {
             int executorPB = Integer.parseInt(memory.substring(0, memory.length() - 1));
             return 1024L * 1024 * 1024 * 1024 * 1024 * executorPB;
         }
-        LOG.info("Cannot parse memory info " + memory);
+        LOG.warn("Cannot parse memory info " +  memory);
+
         return 0L;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f5f972c/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
index eb13c3c..b1881ef 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
@@ -91,11 +91,11 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
     }
 
     private String getSparkRunningJobURL() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(selector.getSelectedUrl()).append("/").append(Constants.V2_APPS_URL);
-        sb.append("?applicationTypes=SPARK&state=RUNNING&");
-        sb.append(Constants.ANONYMOUS_PARAMETER);
-        return sb.toString();
+        return selector.getSelectedUrl()
+                + "/"
+                + Constants.V2_APPS_URL
+                + "?applicationTypes=SPARK&state=RUNNING&"
+                + Constants.ANONYMOUS_PARAMETER;
     }
 
     private String getMRRunningJobURL() {
@@ -105,14 +105,11 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
             Constants.ANONYMOUS_PARAMETER);
     }
 
-    public String getMRFinishedJobURL(String lastFinishedTime) {
+    private String getMRFinishedJobURL(String lastFinishedTime) {
         String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl());
-        StringBuilder sb = new StringBuilder();
-        sb.append(url).append("/").append(Constants.V2_APPS_URL);
-        sb.append("?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin=");
-        sb.append(lastFinishedTime).append("&").append(Constants.ANONYMOUS_PARAMETER);
-
-        return sb.toString();
+        return url + "/" + "Constants.V2_APPS_URL"
+                + "?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin="
+                + lastFinishedTime + "&" + Constants.ANONYMOUS_PARAMETER;
     }
 
     private List<AppInfo> doFetchRunningApplicationsList(String urlString, Constants.CompressionType compressionType) throws Exception {
@@ -139,10 +136,10 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
         }
     }
 
-    private List<AppInfo> getResource(Constants.ResourceType resoureType, Constants.CompressionType compressionType, Object... parameter) throws Exception {
-        switch (resoureType) {
+    private List<AppInfo> getResource(Constants.ResourceType resourceType, Constants.CompressionType compressionType, Object... parameter) throws Exception {
+        switch (resourceType) {
             case COMPLETE_SPARK_JOB:
-                final String urlString = sparkCompleteJobServiceURLBuilder.build((String) parameter[0]);
+                final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), (String) parameter[0]);
                 return doFetchFinishApplicationsList(urlString, compressionType);
             case RUNNING_SPARK_JOB:
                 return doFetchRunningApplicationsList(getSparkRunningJobURL(), compressionType);
@@ -151,22 +148,20 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
             case COMPLETE_MR_JOB:
                 return doFetchFinishApplicationsList(getMRFinishedJobURL((String) parameter[0]), compressionType);
             default:
-                throw new Exception("Not support resourceType :" + resoureType);
+                throw new Exception("Not support resourceType :" + resourceType);
         }
     }
 
-    public List<AppInfo> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception {
+    public List<AppInfo> getResource(Constants.ResourceType resourceType, Object... parameter) throws Exception {
         try {
-            return getResource(resoureType, Constants.CompressionType.GZIP, parameter);
+            return getResource(resourceType, Constants.CompressionType.GZIP, parameter);
         } catch (java.util.zip.ZipException ex) {
-            return getResource(resoureType, Constants.CompressionType.NONE, parameter);
+            return getResource(resourceType, Constants.CompressionType.NONE, parameter);
         }
     }
 
     private String getClusterInfoURL() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(selector.getSelectedUrl()).append("/").append(Constants.YARN_API_CLUSTER_INFO).append("?" + Constants.ANONYMOUS_PARAMETER);
-        return sb.toString();
+        return selector.getSelectedUrl() + "/" + Constants.YARN_API_CLUSTER_INFO + "?" + Constants.ANONYMOUS_PARAMETER;
     }
 
     public ClusterInfo getClusterInfo() throws Exception {
@@ -191,4 +186,4 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f5f972c/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
index 7c188c6..a083ef2 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
@@ -94,7 +94,7 @@ public class HAURLSelectorImpl implements HAURLSelector {
                         }
                         LOG.info("try url " + urlToCheck + "fail for " + (time + 1) + " times, sleep 5 seconds before try again. ");
                         try {
-                            Thread.sleep(1 * 1000);
+                            Thread.sleep(1000);
                         } catch (InterruptedException ex) {
                             LOG.warn("{}", ex);
                         }
@@ -106,4 +106,4 @@ public class HAURLSelectorImpl implements HAURLSelector {
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6f5f972c/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
index 8d959b7..ca6e938 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
@@ -26,11 +26,8 @@ public class SparkCompleteJobServiceURLBuilderImpl implements ServiceURLBuilder
     public String build(String... parameters) {
         String url = URLUtil.removeTrailingSlash(parameters[0]);
 
-        StringBuilder sb = new StringBuilder();
-        sb.append(url).append("/").append(Constants.V2_APPS_URL);
-        sb.append("?applicationTypes=SPARK&state=FINISHED&finishedTimeBegin=");
-        sb.append(parameters[1]).append("&").append(Constants.ANONYMOUS_PARAMETER);
-
-        return sb.toString();
+        return url + "/" + Constants.V2_APPS_URL
+                + "?applicationTypes=SPARK&state=FINISHED&finishedTimeBegin="
+                + parameters[1] + "&" + Constants.ANONYMOUS_PARAMETER;
     }
-}
+}
\ No newline at end of file