You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/08/24 12:23:17 UTC

[3/5] incubator-eagle git commit: [EAGLE-496] fix code style of jpm

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 4163f7b..7293c89 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -33,7 +33,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
     private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class);
     private static final int MAX_RETRY_TIMES = 3;
     private MRHistoryJobConfig configManager;
-    private JobConfigurationAPIEntity m_jobConfigurationEntity;
+    private JobConfigurationAPIEntity jobConfigurationEntity;
 
     public JobConfigurationCreationServiceListener(MRHistoryJobConfig configManager) {
         this.configManager = configManager;
@@ -43,7 +43,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
         if (entity != null) {
             if (entity instanceof JobConfigurationAPIEntity) {
-                this.m_jobConfigurationEntity = (JobConfigurationAPIEntity) entity;
+                this.jobConfigurationEntity = (JobConfigurationAPIEntity) entity;
             }
         }
     }
@@ -65,7 +65,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
         List<JobConfigurationAPIEntity> list = new ArrayList<>();
-        list.add(m_jobConfigurationEntity);
+        list.add(jobConfigurationEntity);
 
         int tried = 0;
         while (tried <= MAX_RETRY_TIMES) {
@@ -82,7 +82,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
                 }
             } finally {
                 list.clear();
-                m_jobConfigurationEntity = null;
+                jobConfigurationEntity = null;
                 client.getJerseyClient().destroy();
                 client.close();
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index abddf3b..e7b8a6b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -62,7 +62,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
     }
 
     /**
-     * We need save network bandwidth as well
+     * We need save network bandwidth as well.
      */
     @Override
     public void flush() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
index 8b85b26..a80462d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
@@ -22,11 +22,6 @@ import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 
 import java.util.Vector;
 
-/**
- * not thread safe
- *
- * @author yonzhang
- */
 public class JobEntityCreationPublisher {
     private Vector<HistoryJobEntityCreationListener> listeners = new Vector<>(2);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
index 594d8e2..ea0ba30 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -18,37 +18,36 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleListener {
-    private final static Logger LOG = LoggerFactory.getLogger(JobEntityLifecycleAggregator.class);
-    private JobExecutionAPIEntity m_jobExecutionAPIEntity;
-    private final JobCounterAggregateFunction m_mapTaskAttemptCounterAgg;
-    private final JobCounterAggregateFunction m_reduceTaskAttemptCounterAgg;
+    private static final Logger LOG = LoggerFactory.getLogger(JobEntityLifecycleAggregator.class);
+    private JobExecutionAPIEntity jobExecutionAPIEntity;
+    private final JobCounterAggregateFunction mapTaskAttemptCounterAgg;
+    private final JobCounterAggregateFunction reduceTaskAttemptCounterAgg;
 
-    private final JobCounterAggregateFunction m_mapFileSystemCounterAgg;
-    private final JobCounterAggregateFunction m_reduceFileSystemTaskCounterAgg;
+    private final JobCounterAggregateFunction mapFileSystemCounterAgg;
+    private final JobCounterAggregateFunction reduceFileSystemTaskCounterAgg;
 
-    private long m_mapAttemptDuration = 0;
-    private long m_reduceAttemptDuration = 0;
+    private long mapAttemptDuration = 0;
+    private long reduceAttemptDuration = 0;
     private boolean jobFinished = false;
 
     public JobEntityLifecycleAggregator() {
-        this.m_mapTaskAttemptCounterAgg = new JobCounterSumFunction();
-        this.m_reduceTaskAttemptCounterAgg = new JobCounterSumFunction();
-        this.m_mapFileSystemCounterAgg = new JobCounterSumFunction();
-        this.m_reduceFileSystemTaskCounterAgg = new JobCounterSumFunction();
+        this.mapTaskAttemptCounterAgg = new JobCounterSumFunction();
+        this.reduceTaskAttemptCounterAgg = new JobCounterSumFunction();
+        this.mapFileSystemCounterAgg = new JobCounterSumFunction();
+        this.reduceFileSystemTaskCounterAgg = new JobCounterSumFunction();
     }
 
     @Override
@@ -57,7 +56,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
             if (entity instanceof TaskAttemptExecutionAPIEntity) {
                 taskAttemptEntityCreated((TaskAttemptExecutionAPIEntity) entity);
             } else if (entity instanceof JobExecutionAPIEntity) {
-                this.m_jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
+                this.jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
             }
         }
     }
@@ -65,44 +64,44 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
     @Override
     public void jobFinish() {
         try {
-            if (m_jobExecutionAPIEntity == null) {
+            if (jobExecutionAPIEntity == null) {
                 throw new IOException("No JobExecutionAPIEntity found before flushing");
             }
 
             LOG.debug("Updating aggregated task attempts to job level counters");
 
-            JobCounters jobCounters = m_jobExecutionAPIEntity.getJobCounters();
+            JobCounters jobCounters = jobExecutionAPIEntity.getJobCounters();
 
             if (jobCounters == null) {
-                LOG.warn("no job counter found for " + this.m_jobExecutionAPIEntity);
+                LOG.warn("no job counter found for " + this.jobExecutionAPIEntity);
                 jobCounters = new JobCounters();
             }
 
             Map<String, Map<String, Long>> counters = jobCounters.getCounters();
 
-            Map<String, Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
+            Map<String, Long> mapTaskAttemptCounter = this.mapTaskAttemptCounterAgg.result();
             if (mapTaskAttemptCounter == null) {
                 mapTaskAttemptCounter = new HashMap<>();
             }
-            mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
+            mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.mapAttemptDuration);
             counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER, mapTaskAttemptCounter);
 
-            Map<String, Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
+            Map<String, Long> reduceTaskAttemptCounter = this.reduceTaskAttemptCounterAgg.result();
             if (reduceTaskAttemptCounter == null) {
                 reduceTaskAttemptCounter = new HashMap<>();
             }
-            reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
+            reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.reduceAttemptDuration);
             counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER, reduceTaskAttemptCounter);
 
-            counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
-            counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_reduceFileSystemTaskCounterAgg.result());
+            counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.mapFileSystemCounterAgg.result());
+            counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.reduceFileSystemTaskCounterAgg.result());
 
             jobCounters.setCounters(counters);
 
-            m_jobExecutionAPIEntity.setJobCounters(jobCounters);
+            jobExecutionAPIEntity.setJobCounters(jobCounters);
             jobFinished = true;
         } catch (Exception e) {
-            LOG.error("Failed to update job execution entity: " + this.m_jobExecutionAPIEntity.toString() + ", due to " + e.getMessage(), e);
+            LOG.error("Failed to update job execution entity: " + this.jobExecutionAPIEntity.toString() + ", due to " + e.getMessage(), e);
         }
     }
 
@@ -112,14 +111,14 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
 
         if (taskType != null && jobCounters != null && jobCounters.getCounters() != null) {
             if (Constants.TaskType.MAP.toString().equals(taskType.toUpperCase())) {
-                m_mapAttemptDuration += entity.getDuration();
-                this.m_mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
-                this.m_mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
+                mapAttemptDuration += entity.getDuration();
+                this.mapTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
+                this.mapFileSystemCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
                 return;
             } else if (Constants.TaskType.REDUCE.toString().equals(taskType.toUpperCase())) {
-                m_reduceAttemptDuration += entity.getDuration();
-                this.m_reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
-                this.m_reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
+                reduceAttemptDuration += entity.getDuration();
+                this.reduceTaskAttemptCounterAgg.accumulate(jobCounters.getCounters().get(Constants.TASK_COUNTER));
+                this.reduceFileSystemTaskCounterAgg.accumulate(jobCounters.getCounters().get(Constants.FILE_SYSTEM_COUNTER));
                 return;
             }
         }
@@ -151,10 +150,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
         public JobCounterSumFunction() {
             result = new HashMap<>();
         }
-
-        /**
-         * @param counters
-         */
+        
         @Override
         public void accumulate(Map<String, Long> counters) {
             if (counters != null) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index e0c3c6b..f95eaa2 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -37,7 +37,7 @@ import java.util.List;
 import java.util.Map;
 
 public class TaskFailureListener implements HistoryJobEntityCreationListener {
-    private static final Logger logger = LoggerFactory.getLogger(TaskFailureListener.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TaskFailureListener.class);
     private static final String MR_ERROR_CATEGORY_CONFIG_FILE_NAME = "MRErrorCategory.config";
     private static final int BATCH_SIZE = 1000;
     private static final int MAX_RETRY_TIMES = 3;
@@ -53,7 +53,7 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
             is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
             URL url = TaskFailureListener.class.getClassLoader().getResource(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
             if (url != null) {
-                logger.info("Feeder is going to load configuration file: " + url.toString());
+                LOG.info("Feeder is going to load configuration file: " + url.toString());
             }
             classifier = new MRErrorClassifier(is);
         } catch (IOException ex) {
@@ -63,6 +63,7 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
                 try {
                     is.close();
                 } catch (IOException e) {
+                    LOG.warn("exception found {}", e);
                 }
             }
         }
@@ -121,16 +122,16 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
         int tried = 0;
         while (tried <= MAX_RETRY_TIMES) {
             try {
-                logger.info("start flushing entities of total number " + failureTasks.size());
+                LOG.info("start flushing entities of total number " + failureTasks.size());
                 client.create(failureTasks);
-                logger.info("finish flushing entities of total number " + failureTasks.size());
+                LOG.info("finish flushing entities of total number " + failureTasks.size());
                 failureTasks.clear();
                 break;
             } catch (Exception ex) {
                 if (tried < MAX_RETRY_TIMES) {
-                    logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+                    LOG.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
                 } else {
-                    logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+                    LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
                     throw ex;
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
index cef29fe..87079fd 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobMain.java
@@ -18,18 +18,17 @@
 
 package org.apache.eagle.jpm.mr.running;
 
+
+import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
+import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
+import org.apache.eagle.jpm.util.Constants;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
-import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
-import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
-import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
-import org.apache.eagle.jpm.util.Constants;
-
 import java.util.List;
-import java.util.regex.Pattern;
 
 public class MRRunningJobMain {
     public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
index a91a493..42426e4 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/config/MRRunningConfigManager.java
@@ -18,8 +18,8 @@
 
 package org.apache.eagle.jpm.mr.running.config;
 
-import com.typesafe.config.Config;
 import org.apache.eagle.common.config.ConfigOptionParser;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,27 +27,35 @@ import java.io.Serializable;
 
 public class MRRunningConfigManager implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(MRRunningConfigManager.class);
+
     public String getEnv() {
         return env;
     }
+
     private String env;
 
-    public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+    public ZKStateConfig getZkStateConfig() {
+        return zkStateConfig;
+    }
+
     private ZKStateConfig zkStateConfig;
 
     public EagleServiceConfig getEagleServiceConfig() {
         return eagleServiceConfig;
     }
+
     private EagleServiceConfig eagleServiceConfig;
 
     public JobExtractorConfig getJobExtractorConfig() {
         return jobExtractorConfig;
     }
+
     private JobExtractorConfig jobExtractorConfig;
 
     public EndpointConfig getEndpointConfig() {
         return endpointConfig;
     }
+
     private EndpointConfig endpointConfig;
 
     public static class ZKStateConfig implements Serializable {
@@ -82,6 +90,7 @@ public class MRRunningConfigManager implements Serializable {
     public Config getConfig() {
         return config;
     }
+
     private Config config;
 
     private static MRRunningConfigManager manager = new MRRunningConfigManager();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index b7de79e..3b31d93 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.running.parser;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
 import org.apache.eagle.jpm.mr.runningentity.JobConfig;
@@ -29,11 +28,12 @@ import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.JobNameNormalization;
 import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.jpm.util.Utils;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
-import org.apache.eagle.jpm.util.resourceFetch.connection.URLConnectionUtils;
-import org.apache.eagle.jpm.util.resourceFetch.model.*;
-import org.apache.eagle.jpm.util.resourceFetch.model.JobCounters;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.connection.URLConnectionUtils;
+import org.apache.eagle.jpm.util.resourcefetch.model.*;
+import org.apache.eagle.jpm.util.resourcefetch.model.JobCounters;
+import org.apache.commons.lang3.tuple.Pair;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -42,13 +42,12 @@ import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
 import java.io.InputStream;
 import java.net.URLConnection;
 import java.util.*;
 import java.util.function.Function;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
 
 public class MRJobParser implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(MRJobParser.class);
@@ -58,6 +57,7 @@ public class MRJobParser implements Runnable {
         FINISHED,
         APP_FINISHED
     }
+
     private AppInfo app;
     private static final int MAX_RETRY_TIMES = 2;
     private MRJobEntityCreationHandler mrJobEntityCreationHandler;
@@ -78,6 +78,7 @@ public class MRJobParser implements Runnable {
     private Set<String> finishedTaskIds;
     private List<String> configKeys;
     private MRRunningConfigManager.JobExtractorConfig jobExtractorConfig;
+
     static {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
     }
@@ -187,11 +188,11 @@ public class MRJobParser implements Runnable {
                 mrJobEntityMap.put(id, new JobExecutionAPIEntity());
             }
 
-            String jobDefId = JobNameNormalization.getInstance().normalize(mrJob.getName());
             JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(id);
             jobExecutionAPIEntity.setTags(new HashMap<>(commonTags));
             jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_ID.toString(), id);
             jobExecutionAPIEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), mrJob.getName());
+            String jobDefId = JobNameNormalization.getInstance().normalize(mrJob.getName());
             jobExecutionAPIEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), jobDefId);
             if (mrJobConfigs.get(id) != null) {
                 JobConfig jobConfig = mrJobConfigs.get(id);
@@ -248,7 +249,9 @@ public class MRJobParser implements Runnable {
             Utils.closeInputStream(is);
         }
 
-        if (jobCounters.getCounterGroup() == null) return true;
+        if (jobCounters.getCounterGroup() == null) {
+            return true;
+        }
         JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(jobId);
         org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounter = new org.apache.eagle.jpm.util.jobcounter.JobCounters();
         Map<String, Map<String, Long>> groups = new HashMap<>();
@@ -261,7 +264,9 @@ public class MRJobParser implements Runnable {
 
             Map<String, Long> counterValues = groups.get(counterGroupName);
             List<JobCounterItem> items = jobCounterGroup.getCounter();
-            if (items == null) continue;
+            if (items == null) {
+                continue;
+            }
             for (JobCounterItem item : items) {
                 String key = item.getName();
                 counterValues.put(key, item.getTotalCounterValue());
@@ -290,7 +295,11 @@ public class MRJobParser implements Runnable {
         org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounter = new org.apache.eagle.jpm.util.jobcounter.JobCounters();
         String jobId = jobAndTaskId.getLeft();
         String taskId = jobAndTaskId.getRight();
-        String taskCounterURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_TASKS_URL + "/" + taskId + "/" + Constants.MR_JOB_COUNTERS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        String taskCounterURL = app.getTrackingUrl()
+            + Constants.MR_JOBS_URL + "/"
+            + jobId + "/" + Constants.MR_TASKS_URL + "/"
+            + taskId + "/" + Constants.MR_JOB_COUNTERS_URL
+            + "?" + Constants.ANONYMOUS_PARAMETER;
         InputStream is = null;
         TaskCounters taskCounters = null;
         try {
@@ -304,7 +313,9 @@ public class MRJobParser implements Runnable {
             Utils.closeInputStream(is);
         }
 
-        if (taskCounters.getTaskCounterGroup() == null) return jobCounter;
+        if (taskCounters.getTaskCounterGroup() == null) {
+            return jobCounter;
+        }
         Map<String, Map<String, Long>> groups = new HashMap<>();
 
         for (TaskCounterGroup taskCounterGroup : taskCounters.getTaskCounterGroup()) {
@@ -314,7 +325,9 @@ public class MRJobParser implements Runnable {
 
             Map<String, Long> counterValues = groups.get(taskCounterGroup.getCounterGroupName());
             List<TaskCounterItem> items = taskCounterGroup.getCounter();
-            if (items == null) continue;
+            if (items == null) {
+                continue;
+            }
             for (TaskCounterItem item : items) {
                 counterValues.put(item.getName(), item.getValue());
             }
@@ -328,7 +341,10 @@ public class MRJobParser implements Runnable {
     private Function<Pair<String, String>, TaskAttemptExecutionAPIEntity> fetchTaskAttempt = jobAndTaskId -> {
         String jobId = jobAndTaskId.getLeft();
         String taskId = jobAndTaskId.getRight();
-        String taskAttemptURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + jobId + "/" + Constants.MR_TASKS_URL + "/" + taskId + "/" + Constants.MR_TASK_ATTEMPTS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
+        String taskAttemptURL = app.getTrackingUrl()
+            + Constants.MR_JOBS_URL + "/"
+            + jobId + "/" + Constants.MR_TASKS_URL + "/"
+            + taskId + "/" + Constants.MR_TASK_ATTEMPTS_URL + "?" + Constants.ANONYMOUS_PARAMETER;
         InputStream is = null;
         List<MRTaskAttempt> taskAttempts = null;
         try {
@@ -463,10 +479,10 @@ public class MRJobParser implements Runnable {
 
             mrJobEntityCreationHandler.add(taskExecutionAPIEntity);
 
-            if (task.getState().equals(Constants.TaskState.SUCCEEDED.toString()) ||
-                    task.getState().equals(Constants.TaskState.FAILED.toString()) ||
-                    task.getState().equals(Constants.TaskState.KILLED.toString()) ||
-                    task.getState().equals(Constants.TaskState.KILL_WAIT.toString())) {
+            if (task.getState().equals(Constants.TaskState.SUCCEEDED.toString())
+                || task.getState().equals(Constants.TaskState.FAILED.toString())
+                || task.getState().equals(Constants.TaskState.KILLED.toString())
+                || task.getState().equals(Constants.TaskState.KILL_WAIT.toString())) {
                 //LOG.info("mr job {} task {} has finished", jobId, task.getId());
                 this.finishedTaskIds.add(task.getId());
             }
@@ -546,10 +562,10 @@ public class MRJobParser implements Runnable {
                     mrJobEntityMap.keySet()
                             .stream()
                             .filter(
-                                    jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString()) ||
-                                            mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString()))
+                                jobId -> mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FINISHED.toString())
+                                    || mrJobEntityMap.get(jobId).getCurrentState().equals(Constants.AppState.FAILED.toString()))
                             .forEach(
-                                    jobId -> this.runningJobManager.delete(app.getId(), jobId));
+                                jobId -> this.runningJobManager.delete(app.getId(), jobId));
                 }
 
                 LOG.info("finish process yarn application " + app.getId());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
index 978c3ec..75650b7 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/recover/MRRunningJobManager.java
@@ -18,14 +18,14 @@
 
 package org.apache.eagle.jpm.mr.running.recover;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.mr.running.config.MRRunningConfigManager;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
-
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.commons.lang3.tuple.Pair;
 import java.io.Serializable;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 
 public class MRRunningJobManager implements Serializable {
     private RunningJobManager runningJobManager;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
index a701d74..ebb9144 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -29,9 +29,9 @@ import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.Utils;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 51307e1..0dccd70 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -28,9 +28,9 @@ import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
index 4282a64..e6cd2f6 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
@@ -25,7 +25,7 @@ import com.typesafe.config.ConfigFactory;
 
 import java.io.Serializable;
 
-public class SparkHistoryCrawlConfig implements Serializable{
+public class SparkHistoryCrawlConfig implements Serializable {
     public ZKStateConfig zkStateConfig;
     public JobHistoryEndpointConfig jobHistoryConfig;
     public HDFSConfig hdfsConfig;
@@ -34,6 +34,7 @@ public class SparkHistoryCrawlConfig implements Serializable{
     public StormConfig stormConfig;
 
     private Config config;
+    
     public Config getConfig() {
         return config;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 0c475a9..382375f 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -19,18 +19,17 @@
 
 package org.apache.eagle.jpm.spark.history.status;
 
+import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 import org.apache.curator.retry.RetryNTimes;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -38,7 +37,7 @@ import java.util.List;
 public class JobHistoryZKStateManager {
     public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
     private String zkRoot;
-    private CuratorFramework _curator;
+    private CuratorFramework curator;
     private static String START_TIMESTAMP = "lastAppTime";
 
     private CuratorFramework newCurator(SparkHistoryCrawlConfig config) throws Exception {
@@ -54,46 +53,46 @@ public class JobHistoryZKStateManager {
         this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.info.site;
 
         try {
-            _curator = newCurator(config);
-            _curator.start();
-;        } catch (Exception e) {
+            curator = newCurator(config);
+            curator.start();
+        } catch (Exception e) {
             LOG.error("Fail to connect to zookeeper", e);
             throw new RuntimeException(e);
         }
     }
 
     public void close() {
-        _curator.close();
-        _curator = null;
+        curator.close();
+        curator = null;
     }
 
-    public List<String> loadApplications(int limit){
+    public List<String> loadApplications(int limit) {
         String jobPath = zkRoot + "/jobs";
         List<String> apps = new ArrayList<>();
-        InterProcessLock lock = new InterProcessReadWriteLock(_curator,jobPath).writeLock();
-        try{
+        InterProcessLock lock = new InterProcessReadWriteLock(curator,jobPath).writeLock();
+        try {
             lock.acquire();
-            Iterator<String> iter =  _curator.getChildren().forPath(jobPath).iterator();
-            while(iter.hasNext()) {
+            Iterator<String> iter =  curator.getChildren().forPath(jobPath).iterator();
+            while (iter.hasNext()) {
                 String appId = iter.next();
                 String path = jobPath + "/" + appId;
-                if(_curator.checkExists().forPath(path) != null){
-                    if(new String(_curator.getData().forPath(path)).equals(ZKStateConstant.AppStatus.INIT.toString())){
+                if (curator.checkExists().forPath(path) != null) {
+                    if (new String(curator.getData().forPath(path)).equals(ZKStateConstant.AppStatus.INIT.toString())) {
                         apps.add(appId);
                     }
                 }
-                if(apps.size() == limit){
+                if (apps.size() == limit) {
                     break;
                 }
             }
             return apps;
-        }catch(Exception e){
+        } catch (Exception e) {
             LOG.error("fail to read unprocessed jobs", e);
             throw new RuntimeException(e);
-        }finally {
-            try{
+        } finally {
+            try {
                 lock.release();
-            }catch(Exception e){
+            } catch (Exception e) {
                 LOG.error("fail to release lock", e);
             }
 
@@ -102,18 +101,19 @@ public class JobHistoryZKStateManager {
 
     public void resetApplications() {
         String jobPath = zkRoot + "/jobs";
-        InterProcessLock lock = new InterProcessReadWriteLock(_curator,jobPath).writeLock();
+        InterProcessLock lock = new InterProcessReadWriteLock(curator,jobPath).writeLock();
         try {
             lock.acquire();
-            Iterator<String> iter =  _curator.getChildren().forPath(jobPath).iterator();
+            Iterator<String> iter =  curator.getChildren().forPath(jobPath).iterator();
             while (iter.hasNext()) {
                 String appId = iter.next();
                 String path = jobPath + "/" + appId;
                 try {
-                    if (_curator.checkExists().forPath(path) != null) {
-                        String status = new String(_curator.getData().forPath(path));
-                        if(!ZKStateConstant.AppStatus.INIT.toString().equals(status))
-                            _curator.setData().forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
+                    if (curator.checkExists().forPath(path) != null) {
+                        String status = new String(curator.getData().forPath(path));
+                        if (!ZKStateConstant.AppStatus.INIT.toString().equals(status)) {
+                            curator.setData().forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
+                        }
                     }
                 } catch (Exception e) {
                     LOG.error("fail to read unprocessed job", e);
@@ -127,136 +127,133 @@ public class JobHistoryZKStateManager {
         } finally {
             try {
                 lock.release();
-            } catch(Exception e) {
+            } catch (Exception e) {
                 LOG.error("fail to release lock", e);
             }
         }
     }
 
-    public SparkApplicationInfo getApplicationInfo(String appId){
+    public SparkApplicationInfo getApplicationInfo(String appId) {
 
-        String appPath = zkRoot + "/jobs/" + appId +"/info";
-        try{
+        String appPath = zkRoot + "/jobs/" + appId + "/info";
+        try {
             SparkApplicationInfo info = new SparkApplicationInfo();
-            if(_curator.checkExists().forPath(appPath)!= null){
-                String[] appStatus = new String(_curator.getData().forPath(appPath)).split("/");
+            if (curator.checkExists().forPath(appPath) != null) {
+                String[] appStatus = new String(curator.getData().forPath(appPath)).split("/");
                 info.setQueue(appStatus[0]);
                 info.setState(appStatus[1]);
                 info.setFinalStatus(appStatus[2]);
-                if(appStatus.length > 3){
+                if (appStatus.length > 3) {
                     info.setUser(appStatus[3]);
                     info.setName(appStatus[4]);
                 }
 
             }
             return info;
-        }catch(Exception e){
+        } catch (Exception e) {
             LOG.error("fail to read application attempt info", e);
             throw new RuntimeException(e);
         }
     }
 
-    public long readLastFinishedTimestamp(){
+    public long readLastFinishedTimestamp() {
         String lastTimeStampPath = zkRoot + "/" + START_TIMESTAMP;
 
-        try{
-            if(_curator.checkExists().forPath(lastTimeStampPath) == null){
-                return 0l;
-            }else{
-                return Long.valueOf(new String(_curator.getData().forPath(lastTimeStampPath)));
+        try {
+            if (curator.checkExists().forPath(lastTimeStampPath) == null) {
+                return 0L;
+            } else {
+                return Long.valueOf(new String(curator.getData().forPath(lastTimeStampPath)));
             }
-        }catch(Exception e){
+        } catch (Exception e) {
             LOG.error("fail to read last finished spark job timestamp", e);
             throw new RuntimeException(e);
         }
     }
 
-    public boolean hasApplication(String appId){
+    public boolean hasApplication(String appId) {
         String path = zkRoot + "/jobs/" + appId;
         try {
-            if (_curator.checkExists().forPath(path) != null) {
+            if (curator.checkExists().forPath(path) != null) {
                 return true;
             }
             return false;
-        }catch (Exception e){
+        } catch (Exception e) {
             LOG.error("fail to check whether application exists", e);
             throw new RuntimeException(e);
         }
     }
 
-    public void addFinishedApplication(String appId, String queue, String yarnState, String yarnStatus, String user, String name){
+    public void addFinishedApplication(String appId, String queue, String yarnState, String yarnStatus, String user, String name) {
         String path = zkRoot + "/jobs/" + appId;
-
-
-        try{
-            if(_curator.checkExists().forPath(path) != null){
-                _curator.delete().deletingChildrenIfNeeded().forPath(path);
+        try {
+            if (curator.checkExists().forPath(path) != null) {
+                curator.delete().deletingChildrenIfNeeded().forPath(path);
             }
 
             name = name.replace("/","_");
-            if(name.length() > 50){
+            if (name.length() > 50) {
                 name = name.substring(0, 50);
             }
 
-            CuratorTransactionBridge result =  _curator.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
+            CuratorTransactionBridge result =  curator.inTransaction().create().withMode(CreateMode.PERSISTENT).forPath(path, ZKStateConstant.AppStatus.INIT.toString().getBytes("UTF-8"));
             result = result.and().create().withMode(CreateMode.PERSISTENT).forPath(path + "/info", String.format("%s/%s/%s/%s/%s", queue, yarnState, yarnStatus, user, name).getBytes("UTF-8"));
 
             result.and().commit();
-        }catch (Exception e){
+        } catch (Exception e) {
             LOG.error("fail adding finished application", e);
             throw new RuntimeException(e);
         }
     }
 
 
-    public void updateLastUpdateTime(Long updateTime){
+    public void updateLastUpdateTime(Long updateTime) {
         String lastTimeStampPath = zkRoot + "/" + START_TIMESTAMP;
-        try{
-            if(_curator.checkExists().forPath(lastTimeStampPath) == null){
-                _curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
-            }else{
+        try {
+            if (curator.checkExists().forPath(lastTimeStampPath) == null) {
+                curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
+            } else {
                 long originalEndTime = this.readLastFinishedTimestamp();
-                if(originalEndTime < updateTime){
-                   _curator.setData().forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
+                if (originalEndTime < updateTime) {
+                    curator.setData().forPath(lastTimeStampPath, updateTime.toString().getBytes("UTF-8"));
                 }
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             LOG.error("fail to update last finished time", e);
             throw new RuntimeException(e);
         }
 
     }
 
-    public void updateApplicationStatus(String appId, Enum<ZKStateConstant.AppStatus> status){
+    public void updateApplicationStatus(String appId, Enum<ZKStateConstant.AppStatus> status) {
 
         String path = zkRoot + "/jobs/" + appId ;
-        InterProcessLock lock = new InterProcessReadWriteLock(_curator,zkRoot+"/jobs").readLock();
-        try{
-            if(_curator.checkExists().forPath(path) != null){
-                if(status.equals(ZKStateConstant.AppStatus.FINISHED)){
+        InterProcessLock lock = new InterProcessReadWriteLock(curator,zkRoot + "/jobs").readLock();
+        try {
+            if (curator.checkExists().forPath(path) != null) {
+                if (status.equals(ZKStateConstant.AppStatus.FINISHED)) {
                     lock.acquire();
-                    _curator.delete().deletingChildrenIfNeeded().forPath(path);
-                }else{
-                    _curator.setData().forPath(path, status.toString().getBytes("UTF-8"));
+                    curator.delete().deletingChildrenIfNeeded().forPath(path);
+                } else {
+                    curator.setData().forPath(path, status.toString().getBytes("UTF-8"));
                 }
-            }else{
+            } else {
                 String errorMsg = String.format("fail to update for application with path %s", path);
                 LOG.error(errorMsg);
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             LOG.error("fail to update application status", e);
             throw new RuntimeException(e);
-        }finally{
-            try{
-                if(lock.isAcquiredInThisProcess())
+        } finally {
+            try {
+                if (lock.isAcquiredInThisProcess()) {
                     lock.release();
-            }catch (Exception e){
+                }
+            } catch (Exception e) {
                 LOG.error("fail to release lock",e);
             }
 
         }
 
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
index 40efa50..578ba3c 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/ZKStateConstant.java
@@ -21,7 +21,7 @@ package org.apache.eagle.jpm.spark.history.status;
 
 public class ZKStateConstant {
 
-    public enum AppStatus{
+    public enum AppStatus {
         INIT, SENT_FOR_PARSE, FINISHED, FAILED
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
index 8404eda..8965d3d 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
@@ -19,26 +19,24 @@
 
 package org.apache.eagle.jpm.spark.history.storm;
 
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
-import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.Map;
 
 public class FinishedSparkJobSpout extends BaseRichSpout {
@@ -52,7 +50,7 @@ public class FinishedSparkJobSpout extends BaseRichSpout {
 
     private static final int FAIL_MAX_TIMES = 5;
 
-    public FinishedSparkJobSpout(SparkHistoryCrawlConfig config){
+    public FinishedSparkJobSpout(SparkHistoryCrawlConfig config) {
         this.config = config;
     }
 
@@ -102,15 +100,14 @@ public class FinishedSparkJobSpout extends BaseRichSpout {
             }
         } catch (Exception e) {
             LOG.error("Fail to run next tuple", e);
-           // this.takeRest(10);
         }
-
     }
 
     private void takeRest(int seconds) {
         try {
             Thread.sleep(seconds * 1000);
-        } catch(InterruptedException e) {
+        } catch (InterruptedException e) {
+            LOG.warn("exception found {}", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
index bd0eb85..423dbef 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
@@ -19,32 +19,32 @@
 
 package org.apache.eagle.jpm.spark.history.storm;
 
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
 import backtype.storm.topology.TopologyBuilder;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
 
 public class SparkHistoryTopology {
 
-    private SparkHistoryCrawlConfig SHConfig;
+    private SparkHistoryCrawlConfig sparkHistoryCrawlConfig;
 
-    public SparkHistoryTopology(SparkHistoryCrawlConfig config){
-        this.SHConfig = config;
+    public SparkHistoryTopology(SparkHistoryCrawlConfig config) {
+        this.sparkHistoryCrawlConfig = config;
     }
 
     public TopologyBuilder getBuilder() {
         TopologyBuilder builder = new TopologyBuilder();
         String spoutName = "sparkHistoryJobSpout";
         String boltName = "sparkHistoryJobBolt";
-        com.typesafe.config.Config config = this.SHConfig.getConfig();
+        com.typesafe.config.Config config = this.sparkHistoryCrawlConfig.getConfig();
         builder.setSpout(spoutName,
-                new FinishedSparkJobSpout(SHConfig),
+                new FinishedSparkJobSpout(sparkHistoryCrawlConfig),
                 config.getInt("storm.parallelismConfig." + spoutName)
         ).setNumTasks(config.getInt("storm.tasks." + spoutName));
 
         builder.setBolt(boltName,
-                new SparkJobParseBolt(SHConfig),
+                new SparkJobParseBolt(sparkHistoryCrawlConfig),
                 config.getInt("storm.parallelismConfig." + boltName)
         ).setNumTasks(config.getInt("storm.tasks." + boltName)).shuffleGrouping(spoutName);
         return builder;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
index 23d5152..f00fa1b 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
@@ -19,11 +19,6 @@
 
 package org.apache.eagle.jpm.spark.history.storm;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
 import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
 import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
 import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
@@ -31,9 +26,14 @@ import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
 import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
 import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.SparkHistoryServerResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.SparkHistoryServerResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.SparkApplication;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
index eb30f5e..f4284e1 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
@@ -28,9 +28,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TestHDFS {
-
     private static final Logger LOG = LoggerFactory.getLogger(TestHDFS.class);
-    public static void main(String[] args) throws Exception{
+
+    public static void main(String[] args) throws Exception {
         SparkHistoryCrawlConfig config = new SparkHistoryCrawlConfig();
 
         Configuration conf  = new Configuration();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
index 61c0751..5e21406 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
@@ -16,14 +16,15 @@
  */
 package org.apache.eagle.jpm.spark.running;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
+
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobFetchSpout;
 import org.apache.eagle.jpm.spark.running.storm.SparkRunningJobParseBolt;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
 
 public class SparkRunningJobApp extends StormApplication {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index 5988273..d9c66e3 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -18,8 +18,8 @@
 
 package org.apache.eagle.jpm.spark.running;
 
-import com.typesafe.config.Config;
 import org.apache.eagle.common.config.ConfigOptionParser;
+import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,28 +33,36 @@ public class SparkRunningJobAppConfig implements Serializable {
     public String getEnv() {
         return env;
     }
+
     private String env;
 
-    ZKStateConfig getZkStateConfig() { return zkStateConfig; }
+    ZKStateConfig getZkStateConfig() {
+        return zkStateConfig;
+    }
+
     private ZKStateConfig zkStateConfig;
     private TopologyConfig topologyConfig;
-    public TopologyConfig getTopologyConfig(){
+
+    public TopologyConfig getTopologyConfig() {
         return topologyConfig;
     }
 
     public EagleServiceConfig getEagleServiceConfig() {
         return eagleServiceConfig;
     }
+
     private EagleServiceConfig eagleServiceConfig;
 
     public JobExtractorConfig getJobExtractorConfig() {
         return jobExtractorConfig;
     }
+
     private JobExtractorConfig jobExtractorConfig;
 
     public EndpointConfig getEndpointConfig() {
         return endpointConfig;
     }
+
     private EndpointConfig endpointConfig;
 
     public static class TopologyConfig implements Serializable {
@@ -100,6 +108,7 @@ public class SparkRunningJobAppConfig implements Serializable {
     public Config getConfig() {
         return config;
     }
+
     private Config config;
 
     private static SparkRunningJobAppConfig manager = new SparkRunningJobAppConfig();
@@ -127,7 +136,7 @@ public class SparkRunningJobAppConfig implements Serializable {
         return manager;
     }
 
-    private void init(Config config){
+    private void init(Config config) {
         this.config = config;
         this.env = config.getString("envContextConfig.env");
         this.zkStateConfig.zkQuorum = config.getString("zookeeperConfig.zkQuorum");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
index 7b8f648..7de1530 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkAppEntity.java
@@ -17,11 +17,11 @@
 
 package org.apache.eagle.jpm.spark.running.entities;
 
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
 
 @Table("eagleSparkRunningApps")
 @ColumnFamily("f")
@@ -265,11 +265,14 @@ public class SparkAppEntity extends TaggedLogAPIEntity {
         return driveMemoryBytes;
     }
 
-    public int getCompleteTasks(){ return completeTasks;}
+    public int getCompleteTasks() {
+        return completeTasks;
+    }
 
     public JobConfig getConfig() {
         return config;
     }
+
     public void setStartTime(long startTime) {
         this.startTime = startTime;
         valueChanged("startTime");
@@ -420,7 +423,7 @@ public class SparkAppEntity extends TaggedLogAPIEntity {
         valueChanged("driveMemoryBytes");
     }
 
-    public void setCompleteTasks(int completeTasks){
+    public void setCompleteTasks(int completeTasks) {
         this.completeTasks = completeTasks;
         valueChanged("completeTasks");
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
index f4de84c..89549ca 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkExecutorEntity.java
@@ -17,10 +17,10 @@
 
 package org.apache.eagle.jpm.spark.running.entities;
 
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
 
 @Table("eagleSparkRunningExecutors")
 @ColumnFamily("f")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
index 1c2caa4..bb56b52 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
@@ -17,10 +17,10 @@
 
 package org.apache.eagle.jpm.spark.running.entities;
 
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
index 72dbe40..be0ffd0 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
@@ -17,10 +17,10 @@
 
 package org.apache.eagle.jpm.spark.running.entities;
 
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
 
 @Table("eagleSparkRunningStages")
 @ColumnFamily("f")
@@ -40,27 +40,27 @@ public class SparkStageEntity extends TaggedLogAPIEntity {
     @Column("d")
     private int numFailedTasks = 0;
     @Column("e")
-    private long executorRunTime = 0l;
+    private long executorRunTime = 0L;
     @Column("f")
-    private long inputBytes = 0l;
+    private long inputBytes = 0L;
     @Column("g")
-    private long inputRecords = 0l;
+    private long inputRecords = 0L;
     @Column("h")
-    private long outputBytes = 0l;
+    private long outputBytes = 0L;
     @Column("i")
-    private long outputRecords = 0l;
+    private long outputRecords = 0L;
     @Column("j")
-    private long shuffleReadBytes = 0l;
+    private long shuffleReadBytes = 0L;
     @Column("k")
-    private long shuffleReadRecords = 0l;
+    private long shuffleReadRecords = 0L;
     @Column("l")
-    private long shuffleWriteBytes = 0l;
+    private long shuffleWriteBytes = 0L;
     @Column("m")
-    private long shuffleWriteRecords = 0l;
+    private long shuffleWriteRecords = 0L;
     @Column("n")
-    private long memoryBytesSpilled = 0l;
+    private long memoryBytesSpilled = 0L;
     @Column("o")
-    private long diskBytesSpilled = 0l;
+    private long diskBytesSpilled = 0L;
     @Column("p")
     private String name;
     @Column("q")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
index 183a62a..e531806 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
@@ -17,10 +17,10 @@
 
 package org.apache.eagle.jpm.spark.running.entities;
 
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.apache.eagle.jpm.util.Constants;
 
 @Table("eagleSparkRunningTasks")
 @ColumnFamily("f")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
index b2a5b63..6411018 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.spark.running.parser;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.spark.crawl.EventType;
 import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.jpm.spark.running.entities.*;
@@ -27,9 +26,10 @@ import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.HDFSUtil;
 import org.apache.eagle.jpm.util.SparkJobTagName;
 import org.apache.eagle.jpm.util.Utils;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
-import org.apache.eagle.jpm.util.resourceFetch.model.*;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourcefetch.model.*;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -189,12 +189,12 @@ public class SparkApplicationParser implements Runnable {
                     //we must flush entities before delete from zk in case of missing finish state of jobs
                     //delete from zk if needed
                     sparkAppEntityMap.keySet()
-                            .stream()
-                            .filter(
-                                    jobId -> sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FINISHED.toString()) ||
-                                            sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FAILED.toString()))
-                            .forEach(
-                                    jobId -> this.sparkRunningJobManager.delete(app.getId(), jobId));
+                        .stream()
+                        .filter(
+                            jobId -> sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FINISHED.toString())
+                                || sparkAppEntityMap.get(jobId).getYarnState().equals(Constants.AppState.FAILED.toString()))
+                        .forEach(
+                            jobId -> this.sparkRunningJobManager.delete(app.getId(), jobId));
                 }
 
                 LOG.info("finish process yarn application " + app.getId());
@@ -243,12 +243,12 @@ public class SparkApplicationParser implements Runnable {
         JobConfig jobConfig = null;
 
         try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) {
-//             // For Yarn version >= 2.7,
-//             // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
-//             String attemptIdFormatted = String.format("%06d", attemptId);
-//             // remove "application_" to get the number part of appID.
-//             String sparkAppIdNum = sparkAppId.substring(12);
-//             String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
+            //             // For Yarn version >= 2.7,
+            //             // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
+            //             String attemptIdFormatted = String.format("%06d", attemptId);
+            //             // remove "application_" to get the number part of appID.
+            //             String sparkAppIdNum = sparkAppId.substring(12);
+            //             String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
 
             // For Yarn version 2.4.x
             // log name: application_1464382345557_269065_1
@@ -277,8 +277,8 @@ public class SparkApplicationParser implements Runnable {
     }
 
     private boolean isClientMode(JobConfig jobConfig) {
-        return jobConfig.containsKey(Constants.SPARK_MASTER_KEY) &&
-               jobConfig.get(Constants.SPARK_MASTER_KEY).equalsIgnoreCase("yarn-client");
+        return jobConfig.containsKey(Constants.SPARK_MASTER_KEY)
+            && jobConfig.get(Constants.SPARK_MASTER_KEY).equalsIgnoreCase("yarn-client");
     }
 
     private boolean fetchSparkApps() {
@@ -315,10 +315,10 @@ public class SparkApplicationParser implements Runnable {
                 lastSavedAttempt = Integer.parseInt(sparkAppEntityMap.get(id).getTags().get(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString()));
             }
             for (int j = lastSavedAttempt; j <= currentAttempt; j++) {
-                SparkAppEntity attemptEntity = new SparkAppEntity();
                 commonTags.put(SparkJobTagName.SPARK_APP_NAME.toString(), sparkApplication.getName());
                 commonTags.put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), "" + j);
                 commonTags.put(SparkJobTagName.SPARK_APP_ID.toString(), id);
+                SparkAppEntity attemptEntity = new SparkAppEntity();
                 attemptEntity.setTags(new HashMap<>(commonTags));
                 attemptEntity.setAppInfo(app);
 
@@ -340,9 +340,9 @@ public class SparkApplicationParser implements Runnable {
                     JobConfig jobConfig = attemptEntity.getConfig();
                     attemptEntity.setExecMemoryBytes(Utils.parseMemory(jobConfig.get(Constants.SPARK_EXECUTOR_MEMORY_KEY)));
 
-                    attemptEntity.setDriveMemoryBytes(isClientMode(jobConfig) ?
-                            0 :
-                            Utils.parseMemory(jobConfig.get(Constants.SPARK_DRIVER_MEMORY_KEY)));
+                    attemptEntity.setDriveMemoryBytes(isClientMode(jobConfig)
+                        ? 0
+                        : Utils.parseMemory(jobConfig.get(Constants.SPARK_DRIVER_MEMORY_KEY)));
                     attemptEntity.setExecutorCores(Integer.parseInt(jobConfig.get(Constants.SPARK_EXECUTOR_CORES_KEY)));
                     // spark.driver.cores may not be set.
                     String driverCoresStr = jobConfig.get(Constants.SPARK_DRIVER_CORES_KEY);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
index 11f7909..3fb6371 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/recover/SparkRunningJobManager.java
@@ -18,14 +18,15 @@
 
 package org.apache.eagle.jpm.spark.running.recover;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
 import org.apache.eagle.jpm.util.jobrecover.RunningJobManager;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import org.apache.commons.lang3.tuple.Pair;
 
 import java.io.Serializable;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 
 public class SparkRunningJobManager implements Serializable {
     private RunningJobManager runningJobManager;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
index 256829e..ccdfe79 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -28,9 +28,9 @@ import org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig;
 import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
 import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
 import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index 21a6ef2..c41804b 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -23,9 +23,9 @@ import org.apache.eagle.jpm.spark.running.entities.SparkAppEntity;
 import org.apache.eagle.jpm.spark.running.parser.SparkApplicationParser;
 import org.apache.eagle.jpm.spark.running.recover.SparkRunningJobManager;
 import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourceFetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
 
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0b852cbc/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 07850f9..1a6c09c 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -21,19 +21,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Constants {
-    private final static Logger LOG = LoggerFactory.getLogger(Constants.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Constants.class);
 
     //SPARK
-    public final static String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
-    public final static String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";
-    public final static String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService";
-    public final static String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService";
-    public final static String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService";
-    public final static String RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME = "RunningSparkAppService";
-    public final static String RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME = "RunningSparkJobService";
-    public final static String RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME = "RunningSparkStageService";
-    public final static String RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME = "RunningSparkTaskService";
-    public final static String RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "RunningSparkExecutorService";
+    public static final String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
+    public static final String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";
+    public static final String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService";
+    public static final String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService";
+    public static final String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService";
+    public static final String RUNNING_SPARK_APP_SERVICE_ENDPOINT_NAME = "RunningSparkAppService";
+    public static final String RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME = "RunningSparkJobService";
+    public static final String RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME = "RunningSparkStageService";
+    public static final String RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME = "RunningSparkTaskService";
+    public static final String RUNNING_SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "RunningSparkExecutorService";
     public static final String APPLICATION_PREFIX = "application";
     public static final String JOB_PREFIX = "job";
     public static final String V2_APPS_URL = "ws/v1/cluster/apps";
@@ -53,7 +53,7 @@ public class Constants {
     public static final String SPARK_YARN_DRIVER_MEMORY_OVERHEAD_KEY = "spark.yarn.driver.memoryOverhead";
     public static final String SPARK_YARN_am_MEMORY_OVERHEAD_KEY = "spark.yarn.am.memoryOverhead";
 
-    public static final String SPARK_APPS_URL ="api/v1/applications";
+    public static final String SPARK_APPS_URL = "api/v1/applications";
     public static final String SPARK_EXECUTORS_URL = "executors";
     public static final String SPARK_JOBS_URL = "jobs";
     public static final String SPARK_STAGES_URL = "stages";
@@ -68,21 +68,27 @@ public class Constants {
     public enum CompressionType {
         GZIP, NONE
     }
+
     public enum JobState {
         NEW, INITED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED, ERROR, FINISHED, ALL
     }
+
     public enum TaskState {
         NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
     }
+
     public enum StageState {
         ACTIVE, COMPLETE, PENDING
     }
+
     public enum AppState {
         NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
     }
+
     public enum AppStatus {
         UNDEFINED, SUCCEEDED, FAILED, KILLED
     }
+
     public enum ResourceType {
         COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL, RUNNING_SPARK_JOB, RUNNING_MR_JOB, CLUSTER_INFO, JOB_CONFIGURATION,
         COMPLETE_MR_JOB
@@ -117,7 +123,7 @@ public class Constants {
     public static final String HIVE_QUERY_STRING = "hive.query.string";
 
     /**
-     * MR task types
+     * MR task types.
      */
     public enum TaskType {
         SETUP, MAP, REDUCE, CLEANUP
@@ -128,9 +134,10 @@ public class Constants {
         NOTAVALIABLE("N/A")
         ;
         private String value;
-        JobType(String value){
+        JobType(String value) {
             this.value = value;
         }
+
         @Override
         public String toString() {
             return this.value;