You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/08/24 07:49:11 UTC

[1/3] incubator-eagle git commit: [EAGLE-461] Convert MR history app with new app framework

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop b52405f12 -> 0bde482be


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 3daae37..ca4a94f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -18,19 +18,20 @@
 
 package org.apache.eagle.jpm.mr.history.storm;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.*;
-import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity;
 import org.apache.eagle.jpm.util.JobIdFilter;
 import org.apache.eagle.jpm.util.JobIdFilterByPartition;
 import org.apache.eagle.jpm.util.JobIdPartitioner;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,28 +44,30 @@ import java.util.Map;
 /**
  * Zookeeper znode structure
  * -zkRoot
- *   - partitions
- *      - 0 (20150101)
- *      - 1 (20150101)
- *      - 2 (20150101)
- *      - ... ...
- *      - N-1 (20150102)
- *   - jobs
- *      - 20150101
- *        - job1
- *        - job2
- *        - job3
- *      - 20150102
- *        - job1
- *        - job2
- *        - job3
- *
+ * - partitions
+ * - 0 (20150101)
+ * - 1 (20150101)
+ * - 2 (20150101)
+ * - ... ...
+ * - N-1 (20150102)
+ * - jobs
+ * - 20150101
+ * - job1
+ * - job2
+ * - job3
+ * - 20150102
+ * - job1
+ * - job2
+ * - job3
+ * <p>
  * Spout can have multiple instances, which is supported by storm parallelism primitive.
- *
+ * </p>
+ * <p>
  * Under znode partitions, N child znodes (name is 0 based integer) would be created with each znode mapped to one spout instance. All jobs will be partitioned into N
  * partitions by applying JobPartitioner class to each job Id. The value of each partition znode is the date when the last job in this partition
  * is successfully processed.
- *
+ * </p>
+ * <p>
  * processing steps
  * 1) In constructor,
  * 2) In open(), calculate jobPartitionId for current spout (which should be exactly same to spout taskId within TopologyContext)
@@ -74,10 +77,9 @@ import java.util.Map;
  * 7) process job files (job history file and job configuration xml file)
  * 8) add job Id to current date slot say for example 20150102 after this job is successfully processed
  * 9) clean up all slots with date less than currentProcessDate - 2 days. (2 days should be configurable)
- *
+ * </p>
  * Note:
  * if one spout instance crashes and is brought up again, open() method would be invoked again, we need think of this scenario.
- *
  */
 
 public class JobHistorySpout extends BaseRichSpout {
@@ -90,20 +92,18 @@ public class JobHistorySpout extends BaseRichSpout {
     private JobHistoryContentFilter contentFilter;
     private JobHistorySpoutCollectorInterceptor interceptor;
     private JHFInputStreamCallback callback;
-    private JHFConfigManager configManager;
-    private JobHistoryLCM m_jhfLCM;
-    private final static int MAX_RETRY_TIMES = 3;
+    private MRHistoryJobConfig configManager;
+    private JobHistoryLCM jhfLCM;
+    private static final int MAX_RETRY_TIMES = 3;
 
-    public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager) {
+    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager) {
         this(filter, configManager, new JobHistorySpoutCollectorInterceptor());
     }
 
     /**
-     * mostly this constructor signature is for unit test purpose as you can put customized interceptor here
-     * @param filter
-     * @param adaptor
+     * mostly this constructor signature is for unit test purpose as you can put customized interceptor here.
      */
-    public JobHistorySpout(JobHistoryContentFilter filter, JHFConfigManager configManager, JobHistorySpoutCollectorInterceptor adaptor) {
+    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, JobHistorySpoutCollectorInterceptor adaptor) {
         this.contentFilter = filter;
         this.configManager = configManager;
         this.interceptor = adaptor;
@@ -131,15 +131,15 @@ public class JobHistorySpout extends BaseRichSpout {
         partitionId = calculatePartitionId(context);
         // sanity verify 0<=partitionId<=numTotalPartitions-1
         if (partitionId < 0 || partitionId > numTotalPartitions) {
-            throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId " +
-                    partitionId + " and numTotalPartitions " + numTotalPartitions);
+            throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId "
+                + partitionId + " and numTotalPartitions " + numTotalPartitions);
         }
         Class<? extends JobIdPartitioner> partitionerCls = configManager.getControlConfig().partitionerCls;
         JobIdPartitioner partitioner;
         try {
             partitioner = partitionerCls.newInstance();
         } catch (Exception e) {
-            LOG.error("failing instantiating job partitioner class " + partitionerCls,e);
+            LOG.error("failing instantiating job partitioner class " + partitionerCls, e);
             throw new IllegalStateException(e);
         }
         JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId);
@@ -148,14 +148,14 @@ public class JobHistorySpout extends BaseRichSpout {
         interceptor.setSpoutOutputCollector(collector);
 
         try {
-            m_jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig());
+            jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig());
             driver = new JHFCrawlerDriverImpl(configManager.getJobHistoryEndpointConfig(),
-                    configManager.getControlConfig(),
-                    callback,
-                    zkState,
-                    m_jhfLCM,
-                    jobIdFilter,
-                    partitionId);
+                configManager.getControlConfig(),
+                callback,
+                zkState,
+                jhfLCM,
+                jobIdFilter,
+                partitionId);
         } catch (Exception e) {
             LOG.error("failing creating crawler driver");
             throw new IllegalStateException(e);
@@ -171,7 +171,7 @@ public class JobHistorySpout extends BaseRichSpout {
         } catch (Exception ex) {
             LOG.error("fail crawling job history file and continue ...", ex);
             try {
-                m_jhfLCM.freshFileSystem();
+                jhfLCM.freshFileSystem();
             } catch (Exception e) {
                 LOG.error("failed to fresh file system ", e);
             }
@@ -179,27 +179,27 @@ public class JobHistorySpout extends BaseRichSpout {
             try {
                 Thread.sleep(1000);
             } catch (Exception e) {
-
+                // ignored
             }
         }
     }
 
     /**
-     * empty because framework will take care of output fields declaration
+     * empty because framework will take care of output fields declaration.
      */
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
     }
 
     /**
-     * add to processedJob
+     * add to processedJob.
      */
     @Override
     public void ack(Object jobId) {
     }
 
     /**
-     * job is not fully processed
+     * job is not fully processed.
      */
     @Override
     public void fail(Object jobId) {
@@ -227,26 +227,28 @@ public class JobHistorySpout extends BaseRichSpout {
             }
         }
 
-        if (minTimeStamp == 0l) {
+        if (minTimeStamp == 0L) {
             return;
         }
 
         LOG.info("update process time stamp {}", minTimeStamp);
-        final JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        final JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
-        Map<String, String> baseTags = new HashMap<String, String>() { {
-            put("site", jobExtractorConfig.site);
-        } };
+        final MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        Map<String, String> baseTags = new HashMap<String, String>() {
+            {
+                put("site", jobExtractorConfig.site);
+            }
+        };
         JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
         entity.setCurrentTimeStamp(minTimeStamp);
         entity.setTimestamp(minTimeStamp);
         entity.setTags(baseTags);
 
         IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.eagleServiceHost,
-                eagleServiceConfig.eagleServicePort,
-                eagleServiceConfig.username,
-                eagleServiceConfig.password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
 
@@ -267,7 +269,7 @@ public class JobHistorySpout extends BaseRichSpout {
                     LOG.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
                 }
             }
-            tried ++;
+            tried++;
         }
 
         client.getJerseyClient().destroy();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
index 933b347..cbde88c 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateLCM.java
@@ -22,12 +22,20 @@ import java.util.List;
 
 public interface JobHistoryZKStateLCM {
     void ensureJobPartitions(int numTotalPartitions);
+
     String readProcessedDate(int partitionId);
+
     List<String> readProcessedJobs(String date);
+
     void updateProcessedDate(int partitionId, String date);
+
     void addProcessedJob(String date, String jobId);
+
     void truncateProcessedJob(String date);
+
     void truncateEverything();
+
     long readProcessedTimeStamp(int partitionId);
+
     void updateProcessedTimeStamp(int partitionId, long timeStamp);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
index 33d3cb2..feb896e 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/zkres/JobHistoryZKStateManager.java
@@ -18,11 +18,12 @@
 
 package org.apache.eagle.jpm.mr.history.zkres;
 
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.ZKStateConfig;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.retry.RetryNTimes;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.ZKStateConfig;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,10 +46,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
 
     private CuratorFramework newCurator(ZKStateConfig config) throws Exception {
         return CuratorFrameworkFactory.newClient(
-                config.zkQuorum,
-                config.zkSessionTimeoutMs,
-                15000,
-                new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
+            config.zkQuorum,
+            config.zkSessionTimeoutMs,
+            15000,
+            new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
         );
     }
 
@@ -86,7 +87,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
             if (_curator.checkExists().forPath(path) != null) {
                 _curator.delete().forPath(path);
             }
-        } catch(Exception ex) {
+        } catch (Exception ex) {
             LOG.error("fail reading forceStartFrom znode", ex);
         }
     }
@@ -102,27 +103,28 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
     /**
      * under zkRoot, znode forceStartFrom is used to force job is crawled from that date
      * IF
-     *    forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD",
+     * forceStartFrom znode is provided, and its value is valid date with format "YYYYMMDD",
      * THEN
-     *    rebuild all partitions with the forceStartFrom
+     * rebuild all partitions with the forceStartFrom
      * ELSE
-     *    IF
-     *       partition structure is changed
-     *    THEN
-     *       IF
-     *          there is valid mindate for existing partitions
-     *       THEN
-     *          rebuild job partitions from that valid mindate
-     *       ELSE
-     *          rebuild job partitions from (today - BACKOFF_DAYS)
-     *       END
-     *    ELSE
-     *      do nothing
-     *    END
+     * IF
+     * partition structure is changed
+     * THEN
+     * IF
+     * there is valid mindate for existing partitions
+     * THEN
+     * rebuild job partitions from that valid mindate
+     * ELSE
+     * rebuild job partitions from (today - BACKOFF_DAYS)
+     * END
+     * ELSE
+     * do nothing
      * END
-     *
-     *
+     * END
+     * <p>
      * forceStartFrom is deleted once its value is used, so next time when topology is restarted, program can run from where topology is stopped last time
+     * </p>
+     * .
      */
     @Override
     public void ensureJobPartitions(int numTotalPartitions) {
@@ -137,7 +139,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
             if (forceStartFrom != null) {
                 try {
                     minDate = Integer.valueOf(forceStartFrom);
-                } catch(Exception ex) {
+                } catch (Exception ex) {
                     LOG.error("failing converting forceStartFrom znode value to integer with value " + forceStartFrom);
                     throw new IllegalStateException();
                 }
@@ -153,16 +155,18 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
                         LOG.info("znode partitions structure is changed, current partition count " + currentCount + ", future count " + numTotalPartitions);
                     }
                 }
-                if (!structureChanged)
+                if (!structureChanged) {
                     return; // do nothing
+                }
 
                 if (pathExists) {
                     List<String> partitions = _curator.getChildren().forPath(path);
                     for (String partition : partitions) {
                         String date = new String(_curator.getData().forPath(path + "/" + partition), "UTF-8");
                         int tmp = Integer.valueOf(date);
-                        if(tmp < minDate)
+                        if (tmp < minDate) {
                             minDate = tmp;
+                        }
                     }
                 }
 
@@ -178,7 +182,7 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         } finally {
             try {
                 lock.release();
-            } catch(Exception e) {
+            } catch (Exception e) {
                 LOG.error("fail releasing lock", e);
                 throw new RuntimeException(e);
             }
@@ -195,9 +199,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
 
         for (int i = 0; i < numTotalPartitions; i++) {
             _curator.create()
-                    .creatingParentsIfNeeded()
-                    .withMode(CreateMode.PERSISTENT)
-                    .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
+                .creatingParentsIfNeeded()
+                .withMode(CreateMode.PERSISTENT)
+                .forPath(path + "/" + i, startingDate.getBytes("UTF-8"));
         }
     }
 
@@ -222,9 +226,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, date.getBytes("UTF-8"));
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path, date.getBytes("UTF-8"));
             } else {
                 _curator.setData().forPath(path, date.getBytes("UTF-8"));
             }
@@ -240,9 +244,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path);
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path);
             } else {
                 _curator.setData().forPath(path);
             }
@@ -311,10 +315,10 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path);
-                return 0l;
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path);
+                return 0L;
             } else {
                 return Long.parseLong(new String(_curator.getData().forPath(path), "UTF-8"));
             }
@@ -330,9 +334,9 @@ public class JobHistoryZKStateManager implements JobHistoryZKStateLCM {
         try {
             if (_curator.checkExists().forPath(path) == null) {
                 _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path);
+                    .creatingParentsIfNeeded()
+                    .withMode(CreateMode.PERSISTENT)
+                    .forPath(path);
             }
 
             _curator.setData().forPath(path, (timeStamp + "").getBytes("UTF-8"));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
new file mode 100644
index 0000000..5e69a16
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<application>
+    <type>SPARK_HISTORY_JOB_APP</type>
+    <name>Spark History Job Monitoring</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.jpm.mr.history.MRHistoryJobApplication</appClass>
+    <viewPath>/apps/jpm</viewPath>
+    <configuration>
+        <!-- org.apache.eagle.jpm.mr.history.MRHistoryJobConfig -->
+        <property>
+            <name>jobExtractorConfig.site</name>
+            <displayName>Site ID</displayName>
+            <value>sandbox</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.mrVersion</name>
+            <value>MRVer2</value>
+        </property>
+        <property>
+            <name>jobExtractorConfig.readTimeOutSeconds</name>
+            <displayName>zkPort</displayName>
+            <value>10</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkQuorum</name>
+            <value>sandbox.hortonworks.com:2181</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkPort</name>
+            <value>2181</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkSessionTimeoutMs</name>
+            <value>15000</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRetryTimes</name>
+            <value>3</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRetryInterval</name>
+            <value>20000</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRoot</name>
+            <value>/test_mrjobhistory</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.basePath</name>
+            <value>/mr-history/done</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.jobTrackerName</name>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.nnEndpoint</name>
+            <value>hdfs://sandbox.hortonworks.com:8020</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.pathContainsJobTrackerName</name>
+            <value>false</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.principal</name>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.keytab</name>
+            <value></value>
+        </property>
+        <property>
+            <name>dataSourceConfig.dryRun</name>
+            <value>false</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.partitionerCls</name>
+            <value>org.apache.eagle.jpm.util.DefaultJobIdPartitioner</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.zeroBasedMonth</name>
+            <value>false</value>
+        </property>
+        <property>
+            <name>MRConfigureKeys.jobConfigKey</name>
+            <value>mapreduce.map.output.compress,
+                mapreduce.map.output.compress.codec,
+                mapreduce.output.fileoutputformat.compress,
+                mapreduce.output.fileoutputformat.compress.type,
+                mapreduce.output.fileoutputformat.compress.codec,
+                mapred.output.format.class,
+                dataplatform.etl.info,
+                mapreduce.map.memory.mb,
+                mapreduce.reduce.memory.mb,
+                mapreduce.map.java.opts,
+                mapreduce.reduce.java.opts</value>
+        </property>
+        <property>
+            <name>MRConfigureKeys.jobNameKey</name>
+            <value>eagle.job.name</value>
+        </property>
+        <property>
+            <name>envContextConfig.parallelismConfig.mrHistoryJobExecutor</name>
+            <value>6</value>
+        </property>
+        <property>
+            <name>envContextConfig.tasks.mrHistoryJobExecutor</name>
+            <value>6</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <description>eagleProps.eagleService.host</description>
+            <value>sandbox.hortonworks.com</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <description>eagleProps.eagleService.port</description>
+            <value>9099</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <description>eagleProps.eagleService.username</description>
+            <value>admin</value>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.password</name>
+            <description>eagleProps.eagleService.password</description>
+            <value>secret</value>
+        </property>
+    </configuration>
+    <docs>
+        <install>
+        </install>
+        <uninstall>
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..56a30bd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index 23a51fc..13e411f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -15,9 +15,6 @@
 
 {
   "envContextConfig" : {
-    "env" : "local",
-    "topologyName" : "mr_history",
-    "stormConfigFile" : "storm.yaml",
     "parallelismConfig" : {
       "mrHistoryJobExecutor" : 6
     },
@@ -62,21 +59,10 @@
       "password": "secret"
     }
   },
-
+  "appId":"mr_history",
+  "mode":"LOCAL",
   "MRConfigureKeys" : {
     "jobNameKey" : "eagle.job.name",
-    "jobConfigKey" : [
-        "mapreduce.map.output.compress",
-        "mapreduce.map.output.compress.codec",
-        "mapreduce.output.fileoutputformat.compress",
-        "mapreduce.output.fileoutputformat.compress.type",
-        "mapreduce.output.fileoutputformat.compress.codec",
-        "mapred.output.format.class",
-        "dataplatform.etl.info",
-        "mapreduce.map.memory.mb",
-        "mapreduce.reduce.memory.mb",
-        "mapreduce.map.java.opts",
-        "mapreduce.reduce.java.opts"
-    ]
+    "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
new file mode 100644
index 0000000..0a3a3a1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProviderTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.AppJUnitRunner;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(AppJUnitRunner.class)
+public class MRHistoryJobApplicationProviderTest {
+    @Inject private ApplicationSimulator simulator;
+
+    @Test
+    public void testRunAsManagedApplicationWithSimulator(){
+        simulator.start(MRHistoryJobApplicationProvider.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
new file mode 100644
index 0000000..318a641
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import com.typesafe.config.ConfigFactory;
+import org.junit.Test;
+
+public class MRHistoryJobApplicationTest {
+    @Test
+    public void testRun(){
+        new MRHistoryJobApplication().run(ConfigFactory.load());
+    }
+}


[3/3] incubator-eagle git commit: [EAGLE-461] Convert MR history app with new app framework

Posted by ha...@apache.org.
[EAGLE-461] Convert MR history app with new app framework

https://issues.apache.org/jira/browse/EAGLE-461

Author: Hao Chen <ha...@apache.org>

Closes #380 from haoch/EAGLE-461.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0bde482b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0bde482b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0bde482b

Branch: refs/heads/develop
Commit: 0bde482be40f208ba944d32ece23533714c87133
Parents: b52405f
Author: Hao Chen <ha...@apache.org>
Authored: Wed Aug 24 15:48:58 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Aug 24 15:48:58 2016 +0800

----------------------------------------------------------------------
 eagle-jpm/eagle-jpm-mr-history/pom.xml          |   5 +
 .../jpm/mr/history/MRHistoryJobApplication.java |  75 +++
 .../MRHistoryJobApplicationProvider.java        |  26 +
 .../jpm/mr/history/MRHistoryJobConfig.java      | 208 +++++++
 .../eagle/jpm/mr/history/MRHistoryJobMain.java  |  70 +--
 .../jpm/mr/history/common/JHFConfigManager.java | 182 ------
 .../crawler/DefaultJHFInputStreamCallback.java  |   8 +-
 .../history/crawler/JHFCrawlerDriverImpl.java   |   6 +-
 .../mr/history/crawler/JobHistoryDAOImpl.java   |   2 +-
 .../HistoryJobEntityCreationListener.java       |   6 +-
 .../HistoryJobEntityLifecycleListener.java      |   5 +-
 .../jpm/mr/history/parser/ImportException.java  |   2 -
 .../mr/history/parser/JHFEventReaderBase.java   | 302 +++++-----
 .../mr/history/parser/JHFMRVer1EventReader.java |  30 +-
 .../jpm/mr/history/parser/JHFMRVer1Parser.java  | 319 ++++++-----
 .../parser/JHFMRVer1PerLineListener.java        |  14 +-
 .../mr/history/parser/JHFMRVer2EventReader.java | 553 +++++++++++++------
 .../jpm/mr/history/parser/JHFMRVer2Parser.java  |  55 +-
 .../jpm/mr/history/parser/JHFParserBase.java    |   3 +-
 .../jpm/mr/history/parser/JHFParserFactory.java |  44 +-
 .../parser/JHFWriteNotCompletedException.java   |   6 +-
 ...JobConfigurationCreationServiceListener.java |  24 +-
 .../JobEntityCreationEagleServiceListener.java  |  45 +-
 .../parser/JobEntityCreationPublisher.java      |   9 +-
 .../parser/JobEntityLifecycleAggregator.java    |  37 +-
 .../mr/history/parser/MRErrorClassifier.java    |  17 +-
 .../jpm/mr/history/parser/RecordTypes.java      |   5 +-
 .../parser/TaskAttemptCounterListener.java      |  54 +-
 .../mr/history/parser/TaskFailureListener.java  | 142 ++---
 .../jpm/mr/history/storm/JobHistorySpout.java   | 122 ++--
 .../mr/history/zkres/JobHistoryZKStateLCM.java  |   8 +
 .../history/zkres/JobHistoryZKStateManager.java |  90 +--
 ....history.MRHistoryJobApplicationProvider.xml | 154 ++++++
 ...org.apache.eagle.app.spi.ApplicationProvider |  16 +
 .../src/main/resources/application.conf         |  20 +-
 .../MRHistoryJobApplicationProviderTest.java    |  33 ++
 .../mr/history/MRHistoryJobApplicationTest.java |  27 +
 37 files changed, 1633 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index bfd4cf2..1ffda6a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -104,6 +104,11 @@
             <artifactId>hadoop-mapreduce-client-core</artifactId>
             <version>${hadoop.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
new file mode 100644
index 0000000..08607a1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
+import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
+import org.apache.eagle.jpm.util.Constants;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import com.typesafe.config.Config;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class MRHistoryJobApplication extends StormApplication {
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        //1. trigger init conf
+        MRHistoryJobConfig appConfig = MRHistoryJobConfig.getInstance(config);
+        com.typesafe.config.Config jhfAppConf = appConfig.getConfig();
+
+        //2. init JobHistoryContentFilter
+        final JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
+        String[] confKeyPatternsSplit = jhfAppConf.getString("MRConfigureKeys.jobConfigKey").split(",");
+        List<String> confKeyPatterns = new ArrayList<>(confKeyPatternsSplit.length);
+        for (String confKeyPattern : confKeyPatternsSplit) {
+            confKeyPatterns.add(confKeyPattern.trim());
+        }
+        confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB);
+        confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB);
+        confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB);
+        confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB);
+
+        String jobNameKey = jhfAppConf.getString("MRConfigureKeys.jobNameKey");
+        builder.setJobNameKey(jobNameKey);
+
+        for (String key : confKeyPatterns) {
+            builder.includeJobKeyPatterns(Pattern.compile(key));
+        }
+        JobHistoryContentFilter filter = builder.build();
+        //3. init topology
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        String spoutName = "mrHistoryJobExecutor";
+        int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName);
+        int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName);
+        if (parallelism > tasks) {
+            parallelism = tasks;
+        }
+        topologyBuilder.setSpout(
+            spoutName,
+            new JobHistorySpout(filter, appConfig),
+            parallelism
+        ).setNumTasks(tasks);
+        return topologyBuilder.createTopology();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
new file mode 100644
index 0000000..9aa1c61
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.mr.history;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+public class MRHistoryJobApplicationProvider extends AbstractApplicationProvider<MRHistoryJobApplication> {
+    @Override
+    public MRHistoryJobApplication getApplication() {
+        return new MRHistoryJobApplication();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
new file mode 100644
index 0000000..ae86904
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history;
+
+import org.apache.eagle.common.config.ConfigOptionParser;
+import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
+import org.apache.eagle.jpm.util.JobIdPartitioner;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class MRHistoryJobConfig implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(MRHistoryJobConfig.class);
+
+    private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf";
+
+    public String getEnv() {
+        return env;
+    }
+
+    private String env;
+
+    public ZKStateConfig getZkStateConfig() {
+        return zkStateConfig;
+    }
+
+    private ZKStateConfig zkStateConfig;
+
+    public JobHistoryEndpointConfig getJobHistoryEndpointConfig() {
+        return jobHistoryEndpointConfig;
+    }
+
+    private JobHistoryEndpointConfig jobHistoryEndpointConfig;
+
+    public ControlConfig getControlConfig() {
+        return controlConfig;
+    }
+
+    private ControlConfig controlConfig;
+
+    public JobExtractorConfig getJobExtractorConfig() {
+        return jobExtractorConfig;
+    }
+
+    private JobExtractorConfig jobExtractorConfig;
+
+    public EagleServiceConfig getEagleServiceConfig() {
+        return eagleServiceConfig;
+    }
+
+    private EagleServiceConfig eagleServiceConfig;
+
+    public Config getConfig() {
+        return config;
+    }
+
+    private Config config;
+
+    public static class ZKStateConfig implements Serializable {
+        public String zkQuorum;
+        public String zkRoot;
+        public int zkSessionTimeoutMs;
+        public int zkRetryTimes;
+        public int zkRetryInterval;
+        public String zkPort;
+    }
+
+    public static class JobHistoryEndpointConfig implements Serializable {
+        public String nnEndpoint;
+        public String basePath;
+        public boolean pathContainsJobTrackerName;
+        public String jobTrackerName;
+        public String principal;
+        public String keyTab;
+    }
+
+    public static class ControlConfig implements Serializable {
+        public boolean dryRun;
+        public Class<? extends JobIdPartitioner> partitionerCls;
+        public boolean zeroBasedMonth;
+        public String timeZone;
+    }
+
+    public static class JobExtractorConfig implements Serializable {
+        public String site;
+        public String mrVersion;
+        public int readTimeoutSeconds;
+    }
+
+    public static class EagleServiceConfig implements Serializable {
+        public String eagleServiceHost;
+        public int eagleServicePort;
+        public String username;
+        public String password;
+    }
+
+    private static MRHistoryJobConfig manager = new MRHistoryJobConfig();
+
+    /**
+     * As this is singleton object and constructed while this class is being initialized,
+     * so any exception within this constructor will be wrapped with java.lang.ExceptionInInitializerError.
+     * And this is unrecoverable and hard to troubleshooting.
+     */
+    private MRHistoryJobConfig() {
+        this.zkStateConfig = new ZKStateConfig();
+        this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig();
+        this.controlConfig = new ControlConfig();
+        this.jobExtractorConfig = new JobExtractorConfig();
+        this.eagleServiceConfig = new EagleServiceConfig();
+    }
+
+    public static MRHistoryJobConfig getInstance(String[] args) {
+        manager.init(args);
+        return manager;
+    }
+
+    public static MRHistoryJobConfig getInstance(Config config) {
+        manager.init(config);
+        return manager;
+    }
+
+    /**
+     * read configuration file and load hbase config etc.
+     */
+    private void init(String[] args) {
+        // TODO: Probably we can remove the properties file path check in future
+        try {
+            LOG.info("Loading from configuration file");
+            init(new ConfigOptionParser().load(args));
+        } catch (Exception e) {
+            LOG.error("failed to load config");
+        }
+    }
+
+    /**
+     * read configuration file and load hbase config etc.
+     */
+    private void init(Config config) {
+        this.config = config;
+        this.env = config.getString("envContextConfig.env");
+        //parse eagle job extractor
+        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
+        this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion");
+        this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds");
+        //parse eagle zk
+        this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+        this.zkStateConfig.zkPort = config.getString("dataSourceConfig.zkPort");
+        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+        this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+        this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+
+        //parse job history endpoint
+        this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath");
+        this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName");
+        this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
+        this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName");
+        this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal");
+        this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
+
+        //parse control config
+        this.controlConfig.dryRun = config.getBoolean("dataSourceConfig.dryRun");
+        try {
+            this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls"));
+            assert this.controlConfig.partitionerCls != null;
+        } catch (Exception e) {
+            LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.util.DefaultJobIdPartitioner", e);
+            this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
+        } finally {
+            LOG.info("Loaded partitioner class: {}", this.controlConfig.partitionerCls);
+        }
+        this.controlConfig.zeroBasedMonth = config.getBoolean("dataSourceConfig.zeroBasedMonth");
+        this.controlConfig.timeZone = config.getString("dataSourceConfig.timeZone");
+
+        // parse eagle service endpoint
+        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+        String port = config.getString("eagleProps.eagleService.port");
+        this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
+        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
+        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+
+        LOG.info("Successfully initialized MRHistoryJobConfig");
+        LOG.info("env: " + this.env);
+        LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum);
+        LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort);
+        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
+        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
index 9f030a7..bef72cc 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
@@ -18,74 +18,8 @@
 
 package org.apache.eagle.jpm.mr.history;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
-import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
-import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
-import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
-import org.apache.eagle.jpm.util.Constants;
-
-import java.util.List;
-import java.util.regex.Pattern;
-
 public class MRHistoryJobMain {
     public static void main(String []args) {
-        try {
-            //1. trigger init conf
-            JHFConfigManager jhfConfigManager = JHFConfigManager.getInstance(args);
-            com.typesafe.config.Config jhfAppConf = jhfConfigManager.getConfig();
-
-            //2. init JobHistoryContentFilter
-            JobHistoryContentFilterBuilder builder = JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
-            List<String> confKeyPatterns = jhfAppConf.getStringList("MRConfigureKeys.jobConfigKey");
-            confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB);
-            confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB);
-            confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB);
-            confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB);
-
-            String jobNameKey = jhfAppConf.getString("MRConfigureKeys.jobNameKey");
-            builder.setJobNameKey(jobNameKey);
-
-            for (String key : confKeyPatterns) {
-                builder.includeJobKeyPatterns(Pattern.compile(key));
-            }
-            JobHistoryContentFilter filter = builder.build();
-
-            //3. init topology
-            TopologyBuilder topologyBuilder = new TopologyBuilder();
-            String topologyName = "mrHistoryJobTopology";
-            if (jhfAppConf.hasPath("envContextConfig.topologyName")) {
-                topologyName = jhfAppConf.getString("envContextConfig.topologyName");
-            }
-            String spoutName = "mrHistoryJobExecutor";
-            int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName);
-            int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName);
-            if (parallelism > tasks) {
-                parallelism = tasks;
-            }
-            topologyBuilder.setSpout(
-                    spoutName,
-                    new JobHistorySpout(filter, jhfConfigManager),
-                    parallelism
-            ).setNumTasks(tasks);
-
-            Config config = new backtype.storm.Config();
-            config.setNumWorkers(jhfAppConf.getInt("envContextConfig.workers"));
-            config.put(Config.TOPOLOGY_DEBUG, true);
-            if (!jhfConfigManager.getEnv().equals("local")) {
-                //cluster mode
-                //parse conf here
-                StormSubmitter.submitTopology(topologyName, config, topologyBuilder.createTopology());
-            } else {
-                //local mode
-                LocalCluster cluster = new LocalCluster();
-                cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
+        new MRHistoryJobApplication().run(args);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
deleted file mode 100644
index c99891b..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JHFConfigManager.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.common;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.ConfigOptionParser;
-import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
-import org.apache.eagle.jpm.util.JobIdPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-
-public class JHFConfigManager implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(JHFConfigManager.class);
-
-    private static final String JOB_CONFIGURE_KEY_CONF_FILE = "JobConfigKeys.conf";
-
-    public String getEnv() {
-        return env;
-    }
-    private String env;
-
-    public ZKStateConfig getZkStateConfig() { return zkStateConfig; }
-    private ZKStateConfig zkStateConfig;
-
-    public JobHistoryEndpointConfig getJobHistoryEndpointConfig() { return jobHistoryEndpointConfig; }
-    private JobHistoryEndpointConfig jobHistoryEndpointConfig;
-
-    public ControlConfig getControlConfig() { return controlConfig; }
-    private ControlConfig controlConfig;
-
-    public JobExtractorConfig getJobExtractorConfig() { return jobExtractorConfig; }
-    private JobExtractorConfig jobExtractorConfig;
-
-    public EagleServiceConfig getEagleServiceConfig() {
-        return eagleServiceConfig;
-    }
-    private EagleServiceConfig eagleServiceConfig;
-
-    public Config getConfig() {
-        return config;
-    }
-    private Config config;
-
-    public static class ZKStateConfig implements Serializable {
-        public String zkQuorum;
-        public String zkRoot;
-        public int zkSessionTimeoutMs;
-        public int zkRetryTimes;
-        public int zkRetryInterval;
-        public String zkPort;
-    }
-
-    public static class JobHistoryEndpointConfig implements Serializable {
-        public String nnEndpoint;
-        public String basePath;
-        public boolean pathContainsJobTrackerName;
-        public String jobTrackerName;
-        public String principal;
-        public String keyTab;
-    }
-
-    public static class ControlConfig implements Serializable {
-        public boolean dryRun;
-        public Class<? extends JobIdPartitioner> partitionerCls;
-        public boolean zeroBasedMonth;
-        public String timeZone;
-    }
-
-    public static class JobExtractorConfig implements Serializable {
-        public String site;
-        public String mrVersion;
-        public int readTimeoutSeconds;
-    }
-
-    public static class EagleServiceConfig implements Serializable {
-        public String eagleServiceHost;
-        public int eagleServicePort;
-        public String username;
-        public String password;
-    }
-
-    private static JHFConfigManager manager = new JHFConfigManager();
-
-    /**
-     * As this is singleton object and constructed while this class is being initialized,
-     * so any exception within this constructor will be wrapped with java.lang.ExceptionInInitializerError.
-     * And this is unrecoverable and hard to troubleshooting.
-     */
-    private JHFConfigManager() {
-        this.zkStateConfig = new ZKStateConfig();
-        this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig();
-        this.controlConfig = new ControlConfig();
-        this.jobExtractorConfig = new JobExtractorConfig();
-        this.eagleServiceConfig = new EagleServiceConfig();
-    }
-
-    public static JHFConfigManager getInstance(String []args) {
-        manager.init(args);
-        return manager;
-    }
-
-    /**
-     * read configuration file and load hbase config etc
-     */
-    private void init(String[] args) {
-        // TODO: Probably we can remove the properties file path check in future
-        try {
-            LOG.info("Loading from configuration file");
-            this.config = new ConfigOptionParser().load(args);
-        } catch (Exception e) {
-            LOG.error("failed to load config");
-        }
-
-        this.env = config.getString("envContextConfig.env");
-
-        //parse eagle job extractor
-        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
-        this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion");
-        this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds");
-        //parse eagle zk
-        this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
-        this.zkStateConfig.zkPort = config.getString("dataSourceConfig.zkPort");
-        this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
-        this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
-        this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
-
-        //parse job history endpoint
-        this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath");
-        this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName");
-        this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
-        this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName");
-        this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal");
-        this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
-
-        //parse control config
-        this.controlConfig.dryRun = config.getBoolean("dataSourceConfig.dryRun");
-        try {
-            this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("dataSourceConfig.partitionerCls"));
-            assert this.controlConfig.partitionerCls != null;
-        } catch (Exception e) {
-            LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.util.DefaultJobIdPartitioner", e);
-            this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
-        } finally {
-            LOG.info("Loaded partitioner class: {}",this.controlConfig.partitionerCls);
-        }
-        this.controlConfig.zeroBasedMonth = config.getBoolean("dataSourceConfig.zeroBasedMonth");
-        this.controlConfig.timeZone = config.getString("dataSourceConfig.timeZone");
-
-        // parse eagle service endpoint
-        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
-        String port = config.getString("eagleProps.eagleService.port");
-        this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
-        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
-        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
-
-        LOG.info("Successfully initialized JHFConfigManager");
-        LOG.info("env: " + this.env);
-        LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum);
-        LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort);
-        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
-        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index 6f85149..aeb35fd 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.crawler;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.parser.JHFParserBase;
 import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory;
 import org.slf4j.Logger;
@@ -33,16 +33,16 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
 
 
     private JobHistoryContentFilter m_filter;
-    private JHFConfigManager m_configManager;
+    private MRHistoryJobConfig m_configManager;
 
-    public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, JHFConfigManager configManager, EagleOutputCollector eagleCollector) {
+    public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, EagleOutputCollector eagleCollector) {
         this.m_filter = filter;
         this.m_configManager = configManager;
     }
 
     @Override
     public void onInputStream(InputStream jobFileInputStream, org.apache.hadoop.conf.Configuration conf) throws Exception {
-        final JHFConfigManager.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig();
+        final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = m_configManager.getJobExtractorConfig();
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() { {
             put("site", jobExtractorConfig.site);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index d3e1816..52bd8ea 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -19,7 +19,7 @@
 package org.apache.eagle.jpm.mr.history.crawler;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.util.JobIdFilter;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateLCM;
 import org.slf4j.Logger;
@@ -61,8 +61,8 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
     private int m_partitionId;
     private TimeZone m_timeZone;
 
-    public JHFCrawlerDriverImpl(JHFConfigManager.JobHistoryEndpointConfig jobHistoryConfig,
-                                JHFConfigManager.ControlConfig controlConfig, JHFInputStreamCallback reader,
+    public JHFCrawlerDriverImpl(MRHistoryJobConfig.JobHistoryEndpointConfig jobHistoryConfig,
+                                MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader,
                                 JobHistoryZKStateLCM zkStateLCM,
                                 JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception {
         this.m_zeroBasedMonth = controlConfig.zeroBasedMonth;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
index 3b303fd..cfd5994 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
@@ -33,7 +33,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager.JobHistoryEndpointConfig;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
 
 public class JobHistoryDAOImpl extends AbstractJobHistoryDAO {
     private static final Logger LOG = LoggerFactory.getLogger(JobHistoryDAOImpl.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
index bdaedd4..892c2ea 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityCreationListener.java
@@ -22,16 +22,18 @@ package org.apache.eagle.jpm.mr.history.parser;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 
 /**
- * generalizing this listener would decouple entity creation and entity handling, also will help unit testing
- * @author yonzhang
+ * generalizing this listener would decouple entity creation and entity handling, also will help unit testing.
  *
+ * @author yonzhang
  */
 public interface HistoryJobEntityCreationListener {
     /**
      * job entity created event
+     *
      * @param entity
      */
     void jobEntityCreated(JobBaseAPIEntity entity) throws Exception;
+
     /**
      * for streaming processing, flush would help commit the last several entities
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
index ae6b5c9..a803c6d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/HistoryJobEntityLifecycleListener.java
@@ -22,13 +22,12 @@ import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 
 public interface HistoryJobEntityLifecycleListener extends HistoryJobEntityCreationListener {
     /**
-     * job entity created event
-     * @param entity
+     * job entity created event.
      */
     void jobEntityCreated(JobBaseAPIEntity entity) throws Exception;
 
     /**
-     * Job finished
+     * Job finished.
      */
     void jobFinish();
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
index d454c31..652eaf8 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/ImportException.java
@@ -18,8 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-/**
- */
 public class ImportException extends RuntimeException {
     private static final long serialVersionUID = -706778307046285820L;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 9992690..82e305a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,14 +18,13 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.historyentity.JobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.JobNameNormalization;
 import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
-import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.jobhistory.EventType;
 import org.slf4j.Logger;
@@ -75,10 +74,18 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     private long sumReduceTaskDuration;
 
     public Constants.JobType fetchJobType(Configuration config) {
-        if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) { return Constants.JobType.CASCADING; }
-        if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) { return Constants.JobType.HIVE; }
-        if (config.get(Constants.JobConfiguration.PIG_JOB) != null) { return Constants.JobType.PIG; }
-        if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {return Constants.JobType.SCOOBI; }
+        if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) {
+            return Constants.JobType.CASCADING;
+        }
+        if (config.get(Constants.JobConfiguration.HIVE_JOB) != null) {
+            return Constants.JobType.HIVE;
+        }
+        if (config.get(Constants.JobConfiguration.PIG_JOB) != null) {
+            return Constants.JobType.PIG;
+        }
+        if (config.get(Constants.JobConfiguration.SCOOBI_JOB) != null) {
+            return Constants.JobType.SCOOBI;
+        }
         return Constants.JobType.NOTAVALIABLE;
     }
 
@@ -86,6 +93,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
      * baseTags stores the basic tag name values which might be used for persisting various entities
      * baseTags includes: cluster, datacenter and jobName
      * baseTags are used for all job/task related entities
+     *
      * @param baseTags
      */
     public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
@@ -120,7 +128,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         this.sumReduceTaskDuration = 0l;
     }
 
-    public void register(HistoryJobEntityLifecycleListener lifecycleListener){
+    public void register(HistoryJobEntityLifecycleListener lifecycleListener) {
         this.jobEntityLifecycleListeners.add(lifecycleListener);
     }
 
@@ -132,7 +140,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         }
         try {
             flush();
-        } catch(Exception ex) {
+        } catch (Exception ex) {
             throw new IOException(ex);
         }
     }
@@ -146,8 +154,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     }
 
     /**
-       * @param id
-       */
+     * @param id
+     */
     private void setJobID(String id) {
         this.m_jobId = id;
     }
@@ -157,128 +165,128 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     }
 
     protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception {
-       String id = values.get(Keys.JOBID);
-
-       if (m_jobId == null) {
-           setJobID(id);
-       } else if (!m_jobId.equals(id)) {
-           String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'";
-           LOG.error(msg);
-           throw new ImportException(msg);
-       }
-
-       if (values.get(Keys.SUBMIT_TIME) != null) {  // job submitted
-           m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME)));
-           m_user = values.get(Keys.USER);
-           m_queueName = values.get(Keys.JOB_QUEUE);
-           m_jobName = values.get(Keys.JOBNAME);
-
-           // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate.
-           String jobDefId = null;
-           if(configuration != null ) {
-               jobDefId = configuration.get(m_filter.getJobNameKey());
-           }
-
-           if(jobDefId == null) {
-               m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName);
-           } else {
-               LOG.debug("Got JobDefId from job configuration for " + id + ": " + jobDefId);
-               m_jobDefId = jobDefId;
-           }
-
-           LOG.info("JobDefId of " + id + ": " + m_jobDefId);
-
-           m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
-           m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
-           m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name());
-           m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
-           m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
-           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
-           entityCreated(m_jobSubmitEventEntity);
-       } else if(values.get(Keys.LAUNCH_TIME) != null) {  // job launched
-           m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME)));
-           m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp();
-           m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
-           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
-           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name());
-           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
-           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
-           m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
-           m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS));
-           m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES));
-           entityCreated(m_jobLaunchEventEntity);
-       } else if(values.get(Keys.FINISH_TIME) != null) {  // job finished
-           m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME)));
-           m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
-           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
-           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS));
-           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
-           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
-           m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
-           entityCreated(m_jobFinishEventEntity);
-
-           // populate jobExecutionEntity entity
-           m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
-           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
-           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
-           m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
-           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName);
-           m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(),this.m_jobType);
-
-           m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
-           m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp());
-           m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp());
-           m_jobExecutionEntity.setDurationTime(m_jobExecutionEntity.getEndTime() - m_jobExecutionEntity.getStartTime());
-           m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
-           m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp());
-           if (values.get(Keys.FAILED_MAPS) != null) {
-               // for Artemis
-               m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS)));
-           }
-           if (values.get(Keys.FAILED_REDUCES) != null) {
-               // for Artemis
-               m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES)));
-           }
-           m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS)));
-           m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES)));
-           m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps);
-           m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces);
-           if (values.get(Keys.COUNTERS) != null || totalCounters != null) {
-               JobCounters jobCounters = parseCounters(totalCounters);
-               m_jobExecutionEntity.setJobCounters(jobCounters);
-               if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) {
-                   Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER);
-                   if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
-                       m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue());
-                   }
-
-                   if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
-                       m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue());
-                   }
-
-                   if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
-                       m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue());
-                   }
-               }
-
-               if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) {
-                   m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
-                   m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
-               }
-           }
-           m_jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration * 1.0 / m_numTotalMaps);
-           if (m_numTotalReduces == 0) {
-               m_jobExecutionEntity.setMaxReduceTaskDuration(0);
-               m_jobExecutionEntity.setAvgReduceTaskDuration(0);
-           } else {
-               m_jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / m_numTotalReduces);
-           }
-           entityCreated(m_jobExecutionEntity);
-       }
+        String id = values.get(Keys.JOBID);
+
+        if (m_jobId == null) {
+            setJobID(id);
+        } else if (!m_jobId.equals(id)) {
+            String msg = "Current job ID '" + id + "' does not match previously stored value '" + m_jobId + "'";
+            LOG.error(msg);
+            throw new ImportException(msg);
+        }
+
+        if (values.get(Keys.SUBMIT_TIME) != null) {  // job submitted
+            m_jobSubmitEventEntity.setTimestamp(Long.valueOf(values.get(Keys.SUBMIT_TIME)));
+            m_user = values.get(Keys.USER);
+            m_queueName = values.get(Keys.JOB_QUEUE);
+            m_jobName = values.get(Keys.JOBNAME);
+
+            // If given job name then use it as norm job name, otherwise use eagle JobNameNormalization rule to generate.
+            String jobDefId = null;
+            if (configuration != null) {
+                jobDefId = configuration.get(m_filter.getJobNameKey());
+            }
+
+            if (jobDefId == null) {
+                m_jobDefId = JobNameNormalization.getInstance().normalize(m_jobName);
+            } else {
+                LOG.debug("Got JobDefId from job configuration for " + id + ": " + jobDefId);
+                m_jobDefId = jobDefId;
+            }
+
+            LOG.info("JobDefId of " + id + ": " + m_jobDefId);
+
+            m_jobSubmitEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+            m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+            m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.SUBMITTED.name());
+            m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+            m_jobSubmitEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+            m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType);
+            entityCreated(m_jobSubmitEventEntity);
+        } else if (values.get(Keys.LAUNCH_TIME) != null) {  // job launched
+            m_jobLaunchEventEntity.setTimestamp(Long.valueOf(values.get(Keys.LAUNCH_TIME)));
+            m_jobLauchTime = m_jobLaunchEventEntity.getTimestamp();
+            m_jobLaunchEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+            m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+            m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), EagleJobStatus.LAUNCHED.name());
+            m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+            m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+            m_jobLaunchEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType);
+            m_numTotalMaps = Integer.valueOf(values.get(Keys.TOTAL_MAPS));
+            m_numTotalReduces = Integer.valueOf(values.get(Keys.TOTAL_REDUCES));
+            entityCreated(m_jobLaunchEventEntity);
+        } else if (values.get(Keys.FINISH_TIME) != null) {  // job finished
+            m_jobFinishEventEntity.setTimestamp(Long.valueOf(values.get(Keys.FINISH_TIME)));
+            m_jobFinishEventEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+            m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+            m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_STATUS.toString(), values.get(Keys.JOB_STATUS));
+            m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+            m_jobFinishEventEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+            m_jobFinishEventEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType);
+            entityCreated(m_jobFinishEventEntity);
+
+            // populate jobExecutionEntity entity
+            m_jobExecutionEntity.getTags().put(MRJobTagName.USER.toString(), m_user);
+            m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_ID.toString(), m_jobId);
+            m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_NAME.toString(), m_jobName);
+            m_jobExecutionEntity.getTags().put(MRJobTagName.JOD_DEF_ID.toString(), m_jobDefId);
+            m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), m_queueName);
+            m_jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.m_jobType);
+
+            m_jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
+            m_jobExecutionEntity.setStartTime(m_jobLaunchEventEntity.getTimestamp());
+            m_jobExecutionEntity.setEndTime(m_jobFinishEventEntity.getTimestamp());
+            m_jobExecutionEntity.setDurationTime(m_jobExecutionEntity.getEndTime() - m_jobExecutionEntity.getStartTime());
+            m_jobExecutionEntity.setTimestamp(m_jobLaunchEventEntity.getTimestamp());
+            m_jobExecutionEntity.setSubmissionTime(m_jobSubmitEventEntity.getTimestamp());
+            if (values.get(Keys.FAILED_MAPS) != null) {
+                // for Artemis
+                m_jobExecutionEntity.setNumFailedMaps(Integer.valueOf(values.get(Keys.FAILED_MAPS)));
+            }
+            if (values.get(Keys.FAILED_REDUCES) != null) {
+                // for Artemis
+                m_jobExecutionEntity.setNumFailedReduces(Integer.valueOf(values.get(Keys.FAILED_REDUCES)));
+            }
+            m_jobExecutionEntity.setNumFinishedMaps(Integer.valueOf(values.get(Keys.FINISHED_MAPS)));
+            m_jobExecutionEntity.setNumFinishedReduces(Integer.valueOf(values.get(Keys.FINISHED_REDUCES)));
+            m_jobExecutionEntity.setNumTotalMaps(m_numTotalMaps);
+            m_jobExecutionEntity.setNumTotalReduces(m_numTotalReduces);
+            if (values.get(Keys.COUNTERS) != null || totalCounters != null) {
+                JobCounters jobCounters = parseCounters(totalCounters);
+                m_jobExecutionEntity.setJobCounters(jobCounters);
+                if (jobCounters.getCounters().containsKey(Constants.JOB_COUNTER)) {
+                    Map<String, Long> counters = jobCounters.getCounters().get(Constants.JOB_COUNTER);
+                    if (counters.containsKey(Constants.JobCounter.DATA_LOCAL_MAPS.toString())) {
+                        m_jobExecutionEntity.setDataLocalMaps(counters.get(Constants.JobCounter.DATA_LOCAL_MAPS.toString()).intValue());
+                    }
+
+                    if (counters.containsKey(Constants.JobCounter.RACK_LOCAL_MAPS.toString())) {
+                        m_jobExecutionEntity.setRackLocalMaps(counters.get(Constants.JobCounter.RACK_LOCAL_MAPS.toString()).intValue());
+                    }
+
+                    if (counters.containsKey(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString())) {
+                        m_jobExecutionEntity.setTotalLaunchedMaps(counters.get(Constants.JobCounter.TOTAL_LAUNCHED_MAPS.toString()).intValue());
+                    }
+                }
+
+                if (m_jobExecutionEntity.getTotalLaunchedMaps() > 0) {
+                    m_jobExecutionEntity.setDataLocalMapsPercentage(m_jobExecutionEntity.getDataLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
+                    m_jobExecutionEntity.setRackLocalMapsPercentage(m_jobExecutionEntity.getRackLocalMaps() * 1.0 / m_jobExecutionEntity.getTotalLaunchedMaps());
+                }
+            }
+            m_jobExecutionEntity.setAvgMapTaskDuration(this.sumMapTaskDuration * 1.0 / m_numTotalMaps);
+            if (m_numTotalReduces == 0) {
+                m_jobExecutionEntity.setMaxReduceTaskDuration(0);
+                m_jobExecutionEntity.setAvgReduceTaskDuration(0);
+            } else {
+                m_jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / m_numTotalReduces);
+            }
+            entityCreated(m_jobExecutionEntity);
+        }
     }
 
     private void entityCreated(JobBaseAPIEntity entity) throws Exception {
-        for (HistoryJobEntityLifecycleListener lifecycleListener: this.jobEntityLifecycleListeners) {
+        for (HistoryJobEntityLifecycleListener lifecycleListener : this.jobEntityLifecycleListeners) {
             lifecycleListener.jobEntityCreated(entity);
         }
 
@@ -295,11 +303,12 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     protected abstract JobCounters parseCounters(Object value) throws IOException;
 
     /**
-      * for one task ID, it has several sequential task events, i.e.
-      * task_start -> task_attempt_start -> task_attempt_finish -> task_attempt_start -> task_attempt_finish -> ... -> task_end
-      * @param values
-      * @throws IOException
-      */
+     * for one task ID, it has several sequential task events, i.e.
+     * task_start -> task_attempt_start -> task_attempt_finish -> task_attempt_start -> task_attempt_finish -> ... -> task_end
+     *
+     * @param values
+     * @throws IOException
+     */
     @SuppressWarnings("serial")
     protected void handleTask(RecordTypes recType, EventType eventType, final Map<Keys, String> values, Object counters) throws Exception {
         String taskAttemptID = values.get(Keys.TASK_ATTEMPT_ID);
@@ -308,7 +317,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         final String taskType = values.get(Keys.TASK_TYPE);
         final String taskID = values.get(Keys.TASKID);
 
-        Map<String, String> taskBaseTags = new HashMap<String, String>(){{
+        Map<String, String> taskBaseTags = new HashMap<String, String>() {{
             put(MRJobTagName.TASK_TYPE.toString(), taskType);
             put(MRJobTagName.USER.toString(), m_user);
             //put(MRJobTagName.JOB_NAME.toString(), _jobName);
@@ -402,11 +411,12 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         Map<String, String> prop = new TreeMap<>();
 
         if (m_filter.acceptJobConfFile()) {
-            Iterator<Map.Entry<String, String> > iter = configuration.iterator();
+            Iterator<Map.Entry<String, String>> iter = configuration.iterator();
             while (iter.hasNext()) {
                 String key = iter.next().getKey();
-                if (included(key) && !excluded(key))
+                if (included(key) && !excluded(key)) {
                     prop.put(key, configuration.get(key));
+                }
             }
         }
 
@@ -442,15 +452,17 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
                     break;
                 }
             }
-            if (!matched)
+            if (!matched) {
                 return false;
+            }
         }
         return true;
     }
 
     private boolean included(String key) {
-        if (m_filter.getJobConfKeyInclusionPatterns() == null)
+        if (m_filter.getJobConfKeyInclusionPatterns() == null) {
             return true;
+        }
         for (Pattern p : m_filter.getJobConfKeyInclusionPatterns()) {
             Matcher m = p.matcher(key);
             if (m.matches()) {
@@ -462,13 +474,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     }
 
     private boolean excluded(String key) {
-        if (m_filter.getJobConfKeyExclusionPatterns() == null)
+        if (m_filter.getJobConfKeyExclusionPatterns() == null) {
             return false;
+        }
         for (Pattern p : m_filter.getJobConfKeyExclusionPatterns()) {
             Matcher m = p.matcher(key);
-            if (m.matches())
+            if (m.matches()) {
                 return true;
+            }
         }
         return false;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
index 654f63f..6932dad 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
@@ -19,9 +19,9 @@
 package org.apache.eagle.jpm.mr.history.parser;
 
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
-import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
@@ -37,8 +37,8 @@ import java.util.Map;
 
 /**
  * Listener holds all informations related to one whole job history file, so it's stateful and does not support multithreading.
- * @author yonzhang
  *
+ * @author yonzhang
  */
 public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer1PerLineListener {
     private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1EventReader.class);
@@ -47,6 +47,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
      * baseTags stores the basic tag name values which might be used for persisting various entities
      * baseTags includes: cluster, datacenter and jobName
      * baseTags are used for all job/task related entities
+     *
      * @param baseTags
      */
     public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
@@ -55,7 +56,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
 
     @Override
     public void handle(RecordTypes recType, Map<Keys, String> values)
-          throws Exception {
+        throws Exception {
         switch (recType) {
             case Job:
                 handleJob(null, values, values.get(Keys.COUNTERS));
@@ -76,11 +77,12 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
                 ;
         }
     }
-     
+
     private void ensureRackHostnameAfterAttemptFinish(Map<Keys, String> values) {
         // only care about attempt finish
-        if (values.get(Keys.FINISH_TIME) == null)
+        if (values.get(Keys.FINISH_TIME) == null) {
             return;
+        }
         String hostname = null;
         String rack = null;
         // we get rack/hostname based on task's status
@@ -92,24 +94,24 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
             hostname = tmp[tmp.length - 1];
             rack = tmp[tmp.length - 2];
             m_host2RackMapping.put(hostname, rack);
-        } else if(values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) {
+        } else if (values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) {
             hostname = values.get(Keys.HOSTNAME);
             // make every effort to get RACK information
             hostname = (hostname == null) ? "" : hostname;
             rack = m_host2RackMapping.get(hostname);
         }
-          
+
         values.put(Keys.HOSTNAME, hostname);
         values.put(Keys.RACK, rack);
     }
-    
+
     @Override
     protected JobCounters parseCounters(Object value) throws IOException {
         JobCounters jc = new JobCounters();
         Map<String, Map<String, Long>> groups = new HashMap<String, Map<String, Long>>();
-        Counters counters = new Counters(); 
+        Counters counters = new Counters();
         try {
-            CountersStrings.parseEscapedCompactString((String)value, counters);
+            CountersStrings.parseEscapedCompactString((String) value, counters);
         } catch (Exception ex) {
             logger.error("can not parse job history", ex);
             throw new IOException(ex);
@@ -118,7 +120,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
         while (it.hasNext()) {
             CounterGroup cg = it.next();
 
-           // hardcoded to exclude business level counters
+            // hardcoded to exclude business level counters
             if (!cg.getName().equals("org.apache.hadoop.mapreduce.FileSystemCounter")
                 && !cg.getName().equals("org.apache.hadoop.mapreduce.TaskCounter")
                 && !cg.getName().equals("org.apache.hadoop.mapreduce.JobCounter")
@@ -128,7 +130,9 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
                 && !cg.getName().equals("org.apache.hadoop.mapred.Task$Counter")                   // for artemis
                 && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter")  // for artemis
                 && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter")
-            ) continue;
+                ) {
+                continue;
+            }
 
             groups.put(cg.getName(), new HashMap<String, Long>());
             Map<String, Long> counterValues = groups.get(cg.getName());
@@ -143,7 +147,7 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
         jc.setCounters(groups);
         return jc;
     }
-    
+
     public JobExecutionAPIEntity jobExecution() {
         return m_jobExecutionEntity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
index bb08ef0..ab59a41 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
@@ -44,16 +44,18 @@ public class JHFMRVer1Parser implements JHFParserBase {
     static final String MAX_COUNTER_COUNT = "10000";
 
     private JHFMRVer1EventReader m_reader;
-    public JHFMRVer1Parser(JHFMRVer1EventReader reader){
+
+    public JHFMRVer1Parser(JHFMRVer1EventReader reader) {
         this.m_reader = reader;
     }
 
     /**
-      * Parses history file and invokes Listener.handle() for
-      * each line of history. It can be used for looking through history
-      * files for specific items without having to keep whole history in memory.
-      * @throws IOException
-      */
+     * Parses history file and invokes Listener.handle() for
+     * each line of history. It can be used for looking through history
+     * files for specific items without having to keep whole history in memory.
+     *
+     * @throws IOException
+     */
     @Override
     public void parse(InputStream in) throws Exception, ParseException {
         // set enough counter number as user may build more counters
@@ -68,7 +70,7 @@ public class JHFMRVer1Parser implements JHFParserBase {
 
             // Check if the file is empty
             if (line == null) {
-              return;
+                return;
             }
 
             // Get the information required for further processing
@@ -80,17 +82,17 @@ public class JHFMRVer1Parser implements JHFParserBase {
             do {
                 buf.append(line);
                 if (!line.trim().endsWith(lineDelim) || line.trim().endsWith(escapedLineDelim)) {
-                  buf.append("\n");
-                  continue;
+                    buf.append("\n");
+                    continue;
                 }
                 parseLine(buf.toString(), m_reader, isEscaped);
                 buf = new StringBuffer();
-            } while ((line = reader.readLine())!= null);
+            } while ((line = reader.readLine()) != null);
 
             // flush to tell listener that we have finished parsing
             logger.info("finish parsing job history file and close");
             m_reader.close();
-        } catch(Exception ex) {
+        } catch (Exception ex) {
             logger.error("can not parse correctly ", ex);
             throw ex;
         } finally {
@@ -104,17 +106,17 @@ public class JHFMRVer1Parser implements JHFParserBase {
         // extract the record type
         int idx = line.indexOf(' ');
         String recType = line.substring(0, idx);
-        String data = line.substring(idx+1, line.length());
+        String data = line.substring(idx + 1, line.length());
 
         Matcher matcher = pattern.matcher(data);
-        Map<Keys,String> parseBuffer = new HashMap<Keys, String>();
+        Map<Keys, String> parseBuffer = new HashMap<Keys, String>();
 
-        while(matcher.find()) {
+        while (matcher.find()) {
             String tuple = matcher.group(0);
-            String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
-            String value = parts[1].substring(1, parts[1].length() -1);
+            String[] parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
+            String value = parts[1].substring(1, parts[1].length() - 1);
             if (isEscaped) {
-              value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, charsToEscape);
+                value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, charsToEscape);
             }
             parseBuffer.put(Keys.valueOf(parts[0]), value);
         }
@@ -131,141 +133,154 @@ public class JHFMRVer1Parser implements JHFParserBase {
         }
 
         parseBuffer.clear();
-      }
-
-      /**
-       * Manages job-history's meta information such as version etc.
-       * Helps in logging version information to the job-history and recover
-       * version information from the history.
-       */
-      static class MetaInfoManager implements JHFMRVer1PerLineListener {
-          private long version = 0L;
-          private KeyValuePair pairs = new KeyValuePair();
-
-          public void close() {
-          }
-          // Extract the version of the history that was used to write the history
-          public MetaInfoManager(String line) throws Exception, ParseException {
-              if (null != line) {
-                  // Parse the line
-                  parseLine(line, this, false);
-              }
-          }
-
-          // Get the line delimiter
-          char getLineDelim() {
-              if (version == 0) {
-                  return '"';
-              } else {
-                  return LINE_DELIMITER_CHAR;
-              }
-          }
-
-          // Checks if the values are escaped or not
-          boolean isValueEscaped() {
-              // Note that the values are not escaped in version 0
-              return version != 0;
-          }
-
-          public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException {
+    }
+
+    /**
+     * Manages job-history's meta information such as version etc.
+     * Helps in logging version information to the job-history and recover
+     * version information from the history.
+     */
+    static class MetaInfoManager implements JHFMRVer1PerLineListener {
+        private long version = 0L;
+        private KeyValuePair pairs = new KeyValuePair();
+
+        public void close() {
+        }
+
+        // Extract the version of the history that was used to write the history
+        public MetaInfoManager(String line) throws Exception, ParseException {
+            if (null != line) {
+                // Parse the line
+                parseLine(line, this, false);
+            }
+        }
+
+        // Get the line delimiter
+        char getLineDelim() {
+            if (version == 0) {
+                return '"';
+            } else {
+                return LINE_DELIMITER_CHAR;
+            }
+        }
+
+        // Checks if the values are escaped or not
+        boolean isValueEscaped() {
+            // Note that the values are not escaped in version 0
+            return version != 0;
+        }
+
+        public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException {
             // Check if the record is of type META
-              if (RecordTypes.Meta == recType) {
-                  pairs.handle(values);
-                  version = pairs.getLong(Keys.VERSION); // defaults to 0
-              }
-          }
-      }
-      
-      /**
-       * Base class contais utility stuff to manage types key value pairs with enums. 
-       */
-      static class KeyValuePair {
-          private Map<Keys, String> values = new HashMap<Keys, String>(); 
-
-          /**
-           * Get 'String' value for given key. Most of the places use Strings as 
-           * values so the default get' method returns 'String'.  This method never returns 
-           * null to ease on GUIs. if no value is found it returns empty string ""
-           * @param k 
-           * @return if null it returns empty string - "" 
-           */
-          public String get(Keys k) {
-              String s = values.get(k);
-              return s == null ? "" : s;
-          }
-          /**
-           * Convert value from history to int and return. 
-           * if no value is found it returns 0.
-           * @param k key 
-           */
-          public int getInt(Keys k) {
-              String s = values.get(k);
-              if (null != s){
-                  return Integer.parseInt(s);
-              }
-              return 0;
-          }
-          /**
-           * Convert value from history to int and return. 
-           * if no value is found it returns 0.
-           * @param k
-           */
-          public long getLong(Keys k) {
-              String s = values.get(k);
-              if (null != s){
-                  return Long.parseLong(s);
-              }
-              return 0;
-          }
-          /**
-           * Set value for the key. 
-           * @param k
-           * @param s
-           */
-          public void set(Keys k, String s) {
-              values.put(k, s);
-          }
-          /**
-           * Adds all values in the Map argument to its own values. 
-           * @param m
-           */
-          public void set(Map<Keys, String> m) {
-              values.putAll(m);
-          }
-          /**
-           * Reads values back from the history, input is same Map as passed to Listener by parseHistory().  
-           * @param values
-           */
-          public synchronized void handle(Map<Keys, String> values) {
-              set(values);
-          }
-          /**
-           * Returns Map containing all key-values. 
-           */
-          public Map<Keys, String> getValues() {
-              return values;
-          }
-      }
-      
-      /**
-       * Job history files contain key="value" pairs, where keys belong to this enum. 
-       * It acts as a global namespace for all keys. 
-       */
-      public static enum Keys { 
-          JOBTRACKERID,
-          START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, 
-          LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
-          FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
-          ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
-          SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
-          TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
-          VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK,
-
-          UBERISED,SPLIT_LOCATIONS,FAILED_DUE_TO_ATTEMPT,MAP_FINISH_TIME,PORT,RACK_NAME,
-
-          //For Artemis
-          WORKFLOW_ID,WORKFLOW_NAME,WORKFLOW_NODE_NAME,WORKFLOW_ADJACENCIES,WORKFLOW_TAGS,
-          SHUFFLE_PORT,LOCALITY,AVATAAR,FAIL_REASON
-      }
+            if (RecordTypes.Meta == recType) {
+                pairs.handle(values);
+                version = pairs.getLong(Keys.VERSION); // defaults to 0
+            }
+        }
+    }
+
+    /**
+     * Base class contais utility stuff to manage types key value pairs with enums.
+     */
+    static class KeyValuePair {
+        private Map<Keys, String> values = new HashMap<Keys, String>();
+
+        /**
+         * Get 'String' value for given key. Most of the places use Strings as
+         * values so the default get' method returns 'String'.  This method never returns
+         * null to ease on GUIs. if no value is found it returns empty string ""
+         *
+         * @param k
+         * @return if null it returns empty string - ""
+         */
+        public String get(Keys k) {
+            String s = values.get(k);
+            return s == null ? "" : s;
+        }
+
+        /**
+         * Convert value from history to int and return.
+         * if no value is found it returns 0.
+         *
+         * @param k key
+         */
+        public int getInt(Keys k) {
+            String s = values.get(k);
+            if (null != s) {
+                return Integer.parseInt(s);
+            }
+            return 0;
+        }
+
+        /**
+         * Convert value from history to int and return.
+         * if no value is found it returns 0.
+         *
+         * @param k
+         */
+        public long getLong(Keys k) {
+            String s = values.get(k);
+            if (null != s) {
+                return Long.parseLong(s);
+            }
+            return 0;
+        }
+
+        /**
+         * Set value for the key.
+         *
+         * @param k
+         * @param s
+         */
+        public void set(Keys k, String s) {
+            values.put(k, s);
+        }
+
+        /**
+         * Adds all values in the Map argument to its own values.
+         *
+         * @param m
+         */
+        public void set(Map<Keys, String> m) {
+            values.putAll(m);
+        }
+
+        /**
+         * Reads values back from the history, input is same Map as passed to Listener by parseHistory().
+         *
+         * @param values
+         */
+        public synchronized void handle(Map<Keys, String> values) {
+            set(values);
+        }
+
+        /**
+         * Returns Map containing all key-values.
+         */
+        public Map<Keys, String> getValues() {
+            return values;
+        }
+    }
+
+    /**
+     * Job history files contain key="value" pairs, where keys belong to this enum.
+     * It acts as a global namespace for all keys.
+     */
+    public static enum Keys {
+        JOBTRACKERID,
+        START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
+        LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
+        FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
+        ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
+        SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
+        TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
+        VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK,
+
+        UBERISED, SPLIT_LOCATIONS, FAILED_DUE_TO_ATTEMPT, MAP_FINISH_TIME, PORT, RACK_NAME,
+
+        //For Artemis
+        WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES, WORKFLOW_TAGS,
+        SHUFFLE_PORT, LOCALITY, AVATAAR, FAIL_REASON
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
index 1c096fc..5d48d5d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
@@ -24,15 +24,15 @@ import java.io.IOException;
 import java.util.Map;
 
 /**
- * Callback interface for reading back log events from JobHistory. This interface 
- * should be implemented and passed to JobHistory.parseHistory() 
- *
+ * Callback interface for reading back log events from JobHistory. This interface
+ * should be implemented and passed to JobHistory.parseHistory()
  */
-public interface JHFMRVer1PerLineListener{
+public interface JHFMRVer1PerLineListener {
     /**
-     * Callback method for history parser. 
-     * @param recType type of record, which is the first entry in the line. 
-     * @param values a map of key-value pairs as thry appear in history.
+     * Callback method for history parser.
+     *
+     * @param recType type of record, which is the first entry in the line.
+     * @param values  a map of key-value pairs as thry appear in history.
      * @throws IOException
      */
     void handle(RecordTypes recType, Map<Keys, String> values) throws Exception;


[2/3] incubator-eagle git commit: [EAGLE-461] Convert MR history app with new app framework

Posted by ha...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 2d960b0..4ca0449 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -19,8 +19,8 @@
 package org.apache.eagle.jpm.mr.history.parser;
 
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
-import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +41,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
 
     /**
      * Create a new Event Reader
+     *
      * @throws IOException
      */
     public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
@@ -50,86 +51,115 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
     @SuppressWarnings("deprecation")
     public void handleEvent(Event wrapper) throws Exception {
         switch (wrapper.type) {
-        case JOB_SUBMITTED:
-            handleJobSubmitted(wrapper); break;
-        case JOB_INITED:
-            handleJobInited(wrapper); break;
-        case JOB_FINISHED:
-            handleJobFinished(wrapper); break;
-        case JOB_PRIORITY_CHANGED:
-            handleJobPriorityChanged(); break;
-        case JOB_STATUS_CHANGED:
-            handleJobStatusChanged(); break;
-        case JOB_FAILED:
-        case JOB_KILLED:
-        case JOB_ERROR:
-            handleJobUnsuccessfulCompletion(wrapper); break;
-        case JOB_INFO_CHANGED:
-            handleJobInfoChanged(); break;
-        case JOB_QUEUE_CHANGED:
-            handleJobQueueChanged();break;
-        case TASK_STARTED:
-            handleTaskStarted(wrapper); break;
-        case TASK_FINISHED:
-            handleTaskFinished(wrapper); break;
-        case TASK_FAILED:
-            handleTaskFailed(wrapper); break;
-        case TASK_UPDATED:
-            handleTaskUpdated(); break;
-
-        // map task
-        case MAP_ATTEMPT_STARTED:
-            handleMapAttemptStarted(wrapper); break;
-        case MAP_ATTEMPT_FINISHED:
-            handleMapAttemptFinished(wrapper); break;
-        case MAP_ATTEMPT_FAILED:
-            handleMapAttemptFailed(wrapper); break;
-        case MAP_ATTEMPT_KILLED:
-            handleMapAttemptKilled(wrapper); break;
-
-        // reduce task
-        case REDUCE_ATTEMPT_STARTED:
-            handleReduceAttemptStarted(wrapper); break;
-        case REDUCE_ATTEMPT_FINISHED:
-            handleReduceAttemptFinished(wrapper); break;
-        case REDUCE_ATTEMPT_FAILED:
-            handleReduceAttemptFailed(wrapper); break;
-        case REDUCE_ATTEMPT_KILLED:
-            handleReduceAttemptKilled(wrapper); break;
-
-        // set up task
-        case SETUP_ATTEMPT_STARTED:
-            break;
-        case SETUP_ATTEMPT_FINISHED:
-            handleSetupAttemptFinished(); break;
-        case SETUP_ATTEMPT_FAILED:
-            handleSetupAttemptFailed(); break;
-        case SETUP_ATTEMPT_KILLED:
-            handleSetupAttemptKilled(); break;
-
-        // clean up task
-        case CLEANUP_ATTEMPT_STARTED:
-            break;
-        case CLEANUP_ATTEMPT_FINISHED:
-            handleCleanupAttemptFinished(); break;
-        case CLEANUP_ATTEMPT_FAILED:
-            handleCleanupAttemptFailed(); break;
-        case CLEANUP_ATTEMPT_KILLED:
-            handleCleanupAttemptKilled(); break;
-
-        case AM_STARTED:
-            handleAMStarted(); break;
-        default:
-            logger.warn("unexpected event type: " + wrapper.type);
+            case JOB_SUBMITTED:
+                handleJobSubmitted(wrapper);
+                break;
+            case JOB_INITED:
+                handleJobInited(wrapper);
+                break;
+            case JOB_FINISHED:
+                handleJobFinished(wrapper);
+                break;
+            case JOB_PRIORITY_CHANGED:
+                handleJobPriorityChanged();
+                break;
+            case JOB_STATUS_CHANGED:
+                handleJobStatusChanged();
+                break;
+            case JOB_FAILED:
+            case JOB_KILLED:
+            case JOB_ERROR:
+                handleJobUnsuccessfulCompletion(wrapper);
+                break;
+            case JOB_INFO_CHANGED:
+                handleJobInfoChanged();
+                break;
+            case JOB_QUEUE_CHANGED:
+                handleJobQueueChanged();
+                break;
+            case TASK_STARTED:
+                handleTaskStarted(wrapper);
+                break;
+            case TASK_FINISHED:
+                handleTaskFinished(wrapper);
+                break;
+            case TASK_FAILED:
+                handleTaskFailed(wrapper);
+                break;
+            case TASK_UPDATED:
+                handleTaskUpdated();
+                break;
+
+            // map task
+            case MAP_ATTEMPT_STARTED:
+                handleMapAttemptStarted(wrapper);
+                break;
+            case MAP_ATTEMPT_FINISHED:
+                handleMapAttemptFinished(wrapper);
+                break;
+            case MAP_ATTEMPT_FAILED:
+                handleMapAttemptFailed(wrapper);
+                break;
+            case MAP_ATTEMPT_KILLED:
+                handleMapAttemptKilled(wrapper);
+                break;
+
+            // reduce task
+            case REDUCE_ATTEMPT_STARTED:
+                handleReduceAttemptStarted(wrapper);
+                break;
+            case REDUCE_ATTEMPT_FINISHED:
+                handleReduceAttemptFinished(wrapper);
+                break;
+            case REDUCE_ATTEMPT_FAILED:
+                handleReduceAttemptFailed(wrapper);
+                break;
+            case REDUCE_ATTEMPT_KILLED:
+                handleReduceAttemptKilled(wrapper);
+                break;
+
+            // set up task
+            case SETUP_ATTEMPT_STARTED:
+                break;
+            case SETUP_ATTEMPT_FINISHED:
+                handleSetupAttemptFinished();
+                break;
+            case SETUP_ATTEMPT_FAILED:
+                handleSetupAttemptFailed();
+                break;
+            case SETUP_ATTEMPT_KILLED:
+                handleSetupAttemptKilled();
+                break;
+
+            // clean up task
+            case CLEANUP_ATTEMPT_STARTED:
+                break;
+            case CLEANUP_ATTEMPT_FINISHED:
+                handleCleanupAttemptFinished();
+                break;
+            case CLEANUP_ATTEMPT_FAILED:
+                handleCleanupAttemptFailed();
+                break;
+            case CLEANUP_ATTEMPT_KILLED:
+                handleCleanupAttemptKilled();
+                break;
+
+            case AM_STARTED:
+                handleAMStarted();
+                break;
+            default:
+                logger.warn("unexpected event type: " + wrapper.type);
         }
     }
 
     private void handleJobPriorityChanged() throws Exception {
         return;
     }
+
     private void handleJobStatusChanged() throws Exception {
         return;
     }
+
     private void handleJobInfoChanged() throws Exception {
         return;
     }
@@ -139,150 +169,285 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
     }
 
     private void handleJobSubmitted(Event wrapper) throws Exception {
-        JobSubmitted js = ((JobSubmitted)wrapper.getEvent());
+        JobSubmitted js = ((JobSubmitted) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
-        if (js.getJobName() != null) values.put(Keys.JOBNAME, js.getJobName().toString());
-        if (js.getUserName() != null) values.put(Keys.USER, js.getUserName().toString());
-        if (js.getSubmitTime() != null) values.put(Keys.SUBMIT_TIME, js.getSubmitTime().toString());
-        if (js.getJobQueueName() != null) values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
+        if (js.getJobid() != null) {
+            values.put(Keys.JOBID, js.getJobid().toString());
+        }
+        if (js.getJobName() != null) {
+            values.put(Keys.JOBNAME, js.getJobName().toString());
+        }
+        if (js.getUserName() != null) {
+            values.put(Keys.USER, js.getUserName().toString());
+        }
+        if (js.getSubmitTime() != null) {
+            values.put(Keys.SUBMIT_TIME, js.getSubmitTime().toString());
+        }
+        if (js.getJobQueueName() != null) {
+            values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
+        }
         handleJob(wrapper.getType(), values, null);
     }
 
     private void handleJobInited(Event wrapper) throws Exception {
-        JobInited js = ((JobInited)wrapper.getEvent());
+        JobInited js = ((JobInited) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
-        if (js.getLaunchTime() != null) values.put(Keys.LAUNCH_TIME, js.getLaunchTime().toString());
-        if (js.getTotalMaps() != null) values.put(Keys.TOTAL_MAPS, js.getTotalMaps().toString());
-        if (js.getTotalReduces() != null) values.put(Keys.TOTAL_REDUCES, js.getTotalReduces().toString());
-        if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
-        if (js.getUberized() != null) values.put(Keys.UBERISED, js.getUberized().toString());
+        if (js.getJobid() != null) {
+            values.put(Keys.JOBID, js.getJobid().toString());
+        }
+        if (js.getLaunchTime() != null) {
+            values.put(Keys.LAUNCH_TIME, js.getLaunchTime().toString());
+        }
+        if (js.getTotalMaps() != null) {
+            values.put(Keys.TOTAL_MAPS, js.getTotalMaps().toString());
+        }
+        if (js.getTotalReduces() != null) {
+            values.put(Keys.TOTAL_REDUCES, js.getTotalReduces().toString());
+        }
+        if (js.getJobStatus() != null) {
+            values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+        }
+        if (js.getUberized() != null) {
+            values.put(Keys.UBERISED, js.getUberized().toString());
+        }
         handleJob(wrapper.getType(), values, null);
     }
 
     private void handleJobFinished(Event wrapper) throws Exception {
-        JobFinished js = ((JobFinished)wrapper.getEvent());
+        JobFinished js = ((JobFinished) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
-        if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
-        if (js.getFailedMaps() != null) values.put(Keys.FAILED_MAPS, js.getFailedMaps().toString());
-        if (js.getFailedReduces() != null) values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
+        if (js.getJobid() != null) {
+            values.put(Keys.JOBID, js.getJobid().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getFinishedMaps() != null) {
+            values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
+        }
+        if (js.getFinishedReduces() != null) {
+            values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
+        }
+        if (js.getFailedMaps() != null) {
+            values.put(Keys.FAILED_MAPS, js.getFailedMaps().toString());
+        }
+        if (js.getFailedReduces() != null) {
+            values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
+        }
         values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCESS.name());
         handleJob(wrapper.getType(), values, js.getTotalCounters());
     }
 
     private void handleJobUnsuccessfulCompletion(Event wrapper) throws Exception {
-        JobUnsuccessfulCompletion js = ((JobUnsuccessfulCompletion)wrapper.getEvent());
+        JobUnsuccessfulCompletion js = ((JobUnsuccessfulCompletion) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getJobid() != null) values.put(Keys.JOBID, js.getJobid().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getFinishedMaps() != null) values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
-        if (js.getFinishedReduces() != null) values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
-        if (js.getJobStatus() != null) values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+        if (js.getJobid() != null) {
+            values.put(Keys.JOBID, js.getJobid().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getFinishedMaps() != null) {
+            values.put(Keys.FINISHED_MAPS, js.getFinishedMaps().toString());
+        }
+        if (js.getFinishedReduces() != null) {
+            values.put(Keys.FINISHED_REDUCES, js.getFinishedReduces().toString());
+        }
+        if (js.getJobStatus() != null) {
+            values.put(Keys.JOB_STATUS, js.getJobStatus().toString());
+        }
         handleJob(wrapper.getType(), values, null);
     }
 
     private void handleTaskStarted(Event wrapper) throws Exception {
-        TaskStarted js = ((TaskStarted)wrapper.getEvent());
+        TaskStarted js = ((TaskStarted) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString());
-        if (js.getSplitLocations() != null) values.put(Keys.SPLIT_LOCATIONS, js.getSplitLocations().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getStartTime() != null) {
+            values.put(Keys.START_TIME, js.getStartTime().toString());
+        }
+        if (js.getSplitLocations() != null) {
+            values.put(Keys.SPLIT_LOCATIONS, js.getSplitLocations().toString());
+        }
         handleTask(RecordTypes.Task, wrapper.getType(), values, null);
     }
 
     private void handleTaskFinished(Event wrapper) throws Exception {
-        TaskFinished js = ((TaskFinished)wrapper.getEvent());
+        TaskFinished js = ((TaskFinished) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getStatus() != null) {
+            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+        }
         handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
     }
 
     private void handleTaskFailed(Event wrapper) throws Exception {
-        TaskFailed js = ((TaskFailed)wrapper.getEvent());
+        TaskFailed js = ((TaskFailed) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
 
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
-        if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString());
-        if (js.getFailedDueToAttempt() != null) values.put(Keys.FAILED_DUE_TO_ATTEMPT, js.getFailedDueToAttempt().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getStatus() != null) {
+            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+        }
+        if (js.getError() != null) {
+            values.put(Keys.ERROR, js.getError().toString());
+        }
+        if (js.getFailedDueToAttempt() != null) {
+            values.put(Keys.FAILED_DUE_TO_ATTEMPT, js.getFailedDueToAttempt().toString());
+        }
         handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
     }
 
     private String normalizeTaskStatus(String taskStatus) {
-        if (taskStatus.equals("SUCCEEDED"))
+        if (taskStatus.equals("SUCCEEDED")) {
             return EagleTaskStatus.SUCCESS.name();
+        }
         return taskStatus;
     }
 
-    private void handleTaskUpdated(){
+    private void handleTaskUpdated() {
         return;
     }
 
     private void handleMapAttemptStarted(Event wrapper) throws Exception {
         handleTaskAttemptStarted(wrapper, RecordTypes.MapAttempt);
     }
+
     private void handleReduceAttemptStarted(Event wrapper) throws Exception {
         handleTaskAttemptStarted(wrapper, RecordTypes.ReduceAttempt);
     }
 
     private void handleTaskAttemptStarted(Event wrapper, RecordTypes recordType) throws Exception {
-        TaskAttemptStarted js = ((TaskAttemptStarted)wrapper.getEvent());
+        TaskAttemptStarted js = ((TaskAttemptStarted) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
-        if (js.getStartTime() != null) values.put(Keys.START_TIME, js.getStartTime().toString());
-        if (js.getTrackerName() != null) values.put(Keys.TRACKER_NAME, js.getTrackerName().toString());
-        if (js.getHttpPort() != null) values.put(Keys.HTTP_PORT, js.getHttpPort().toString());
-        if (js.getShufflePort() != null) values.put(Keys.SHUFFLE_PORT, js.getShufflePort().toString());
-        if (js.getLocality() != null) values.put(Keys.LOCALITY, js.getLocality().toString());
-        if (js.getAvataar() != null) values.put(Keys.AVATAAR, js.getAvataar().toString());
-        handleTask(recordType,wrapper.getType(), values, null);
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getAttemptId() != null) {
+            values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        }
+        if (js.getStartTime() != null) {
+            values.put(Keys.START_TIME, js.getStartTime().toString());
+        }
+        if (js.getTrackerName() != null) {
+            values.put(Keys.TRACKER_NAME, js.getTrackerName().toString());
+        }
+        if (js.getHttpPort() != null) {
+            values.put(Keys.HTTP_PORT, js.getHttpPort().toString());
+        }
+        if (js.getShufflePort() != null) {
+            values.put(Keys.SHUFFLE_PORT, js.getShufflePort().toString());
+        }
+        if (js.getLocality() != null) {
+            values.put(Keys.LOCALITY, js.getLocality().toString());
+        }
+        if (js.getAvataar() != null) {
+            values.put(Keys.AVATAAR, js.getAvataar().toString());
+        }
+        handleTask(recordType, wrapper.getType(), values, null);
     }
 
     private void handleMapAttemptFinished(Event wrapper) throws Exception {
-        MapAttemptFinished js = ((MapAttemptFinished)wrapper.getEvent());
+        MapAttemptFinished js = ((MapAttemptFinished) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
-        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getMapFinishTime() != null) values.put(Keys.MAP_FINISH_TIME, js.getMapFinishTime().toString());
-        if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
-        if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
-        if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
-        if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getTaskStatus() != null) {
+            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+        }
+        if (js.getAttemptId() != null) {
+            values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getMapFinishTime() != null) {
+            values.put(Keys.MAP_FINISH_TIME, js.getMapFinishTime().toString());
+        }
+        if (js.getHostname() != null) {
+            values.put(Keys.HOSTNAME, js.getHostname().toString());
+        }
+        if (js.getPort() != null) {
+            values.put(Keys.PORT, js.getPort().toString());
+        }
+        if (js.getRackname() != null) {
+            values.put(Keys.RACK_NAME, js.getRackname().toString());
+        }
+        if (js.getState() != null) {
+            values.put(Keys.STATE_STRING, js.getState().toString());
+        }
 
         ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
-        handleTask(RecordTypes.MapAttempt,wrapper.getType(), values, js.getCounters());
+        handleTask(RecordTypes.MapAttempt, wrapper.getType(), values, js.getCounters());
     }
+
     private void handleReduceAttemptFinished(Event wrapper) throws Exception {
-        ReduceAttemptFinished js = ((ReduceAttemptFinished)wrapper.getEvent());
+        ReduceAttemptFinished js = ((ReduceAttemptFinished) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getTaskStatus() != null) values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
-        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getShuffleFinishTime() != null) values.put(Keys.SHUFFLE_FINISHED, js.getShuffleFinishTime().toString());
-        if (js.getSortFinishTime() != null) values.put(Keys.SORT_FINISHED, js.getSortFinishTime().toString());
-        if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
-        if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
-        if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
-        if (js.getState() != null) values.put(Keys.STATE_STRING, js.getState().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getTaskStatus() != null) {
+            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+        }
+        if (js.getAttemptId() != null) {
+            values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getShuffleFinishTime() != null) {
+            values.put(Keys.SHUFFLE_FINISHED, js.getShuffleFinishTime().toString());
+        }
+        if (js.getSortFinishTime() != null) {
+            values.put(Keys.SORT_FINISHED, js.getSortFinishTime().toString());
+        }
+        if (js.getHostname() != null) {
+            values.put(Keys.HOSTNAME, js.getHostname().toString());
+        }
+        if (js.getPort() != null) {
+            values.put(Keys.PORT, js.getPort().toString());
+        }
+        if (js.getRackname() != null) {
+            values.put(Keys.RACK_NAME, js.getRackname().toString());
+        }
+        if (js.getState() != null) {
+            values.put(Keys.STATE_STRING, js.getState().toString());
+        }
         ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
-        handleTask(RecordTypes.ReduceAttempt,wrapper.getType(), values, js.getCounters());
+        handleTask(RecordTypes.ReduceAttempt, wrapper.getType(), values, js.getCounters());
     }
 
     private void ensureRackAfterAttemptFinish(String rackname, Map<Keys, String> values) {
@@ -296,61 +461,89 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
     private void handleMapAttemptFailed(Event wrapper) throws Exception {
         handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt);
     }
+
     private void handleReduceAttemptFailed(Event wrapper) throws Exception {
         handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt);
     }
-    private void handleMapAttemptKilled(Event wrapper)throws Exception {
+
+    private void handleMapAttemptKilled(Event wrapper) throws Exception {
         handleTaskAttemptFailed(wrapper, RecordTypes.MapAttempt);
     }
-    private void handleReduceAttemptKilled(Event wrapper)throws Exception {
+
+    private void handleReduceAttemptKilled(Event wrapper) throws Exception {
         handleTaskAttemptFailed(wrapper, RecordTypes.ReduceAttempt);
     }
-    
+
     private void handleTaskAttemptFailed(Event wrapper, RecordTypes recordType) throws Exception {
-        TaskAttemptUnsuccessfulCompletion js = ((TaskAttemptUnsuccessfulCompletion)wrapper.getEvent());
+        TaskAttemptUnsuccessfulCompletion js = ((TaskAttemptUnsuccessfulCompletion) wrapper.getEvent());
         Map<Keys, String> values = new HashMap<>();
-        if (js.getTaskid() != null) values.put(Keys.TASKID, js.getTaskid().toString());
-        if (js.getTaskType() != null) values.put(Keys.TASK_TYPE, js.getTaskType().toString());
-        if (js.getAttemptId() != null) values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
-        if (js.getFinishTime() != null) values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
-        if (js.getHostname() != null) values.put(Keys.HOSTNAME, js.getHostname().toString());
-        if (js.getPort() != null) values.put(Keys.PORT, js.getPort().toString());
-        if (js.getRackname() != null) values.put(Keys.RACK_NAME, js.getRackname().toString());
-        if (js.getError() != null) values.put(Keys.ERROR, js.getError().toString());
-        if (js.getStatus() != null) values.put(Keys.TASK_STATUS, js.getStatus().toString());
+        if (js.getTaskid() != null) {
+            values.put(Keys.TASKID, js.getTaskid().toString());
+        }
+        if (js.getTaskType() != null) {
+            values.put(Keys.TASK_TYPE, js.getTaskType().toString());
+        }
+        if (js.getAttemptId() != null) {
+            values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
+        }
+        if (js.getFinishTime() != null) {
+            values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
+        }
+        if (js.getHostname() != null) {
+            values.put(Keys.HOSTNAME, js.getHostname().toString());
+        }
+        if (js.getPort() != null) {
+            values.put(Keys.PORT, js.getPort().toString());
+        }
+        if (js.getRackname() != null) {
+            values.put(Keys.RACK_NAME, js.getRackname().toString());
+        }
+        if (js.getError() != null) {
+            values.put(Keys.ERROR, js.getError().toString());
+        }
+        if (js.getStatus() != null) {
+            values.put(Keys.TASK_STATUS, js.getStatus().toString());
+        }
         ensureRackAfterAttemptFinish(js.getRackname().toString(), values);
-        handleTask(recordType,wrapper.getType(), values, js.getCounters());
-    }    
-    private void handleSetupAttemptFinished(){
+        handleTask(recordType, wrapper.getType(), values, js.getCounters());
+    }
+
+    private void handleSetupAttemptFinished() {
         return;
     }
-    private void handleSetupAttemptFailed(){
+
+    private void handleSetupAttemptFailed() {
         return;
     }
-    private void handleSetupAttemptKilled(){
+
+    private void handleSetupAttemptKilled() {
         return;
     }
-    private void handleCleanupAttemptFinished(){
+
+    private void handleCleanupAttemptFinished() {
         return;
     }
-    private void handleCleanupAttemptFailed(){
+
+    private void handleCleanupAttemptFailed() {
         return;
     }
-    private void handleCleanupAttemptKilled(){
+
+    private void handleCleanupAttemptKilled() {
         return;
     }
-    private void handleAMStarted(){
+
+    private void handleAMStarted() {
         return;
     }
 
-    protected JobCounters parseCounters(Object value) throws IOException{
+    protected JobCounters parseCounters(Object value) throws IOException {
         JobCounters jc = new JobCounters();
         Map<String, Map<String, Long>> groups = new HashMap<>();
-        JhCounters counters = (JhCounters)value;
+        JhCounters counters = (JhCounters) value;
         List<JhCounterGroup> list = counters.getGroups();
         for (JhCounterGroup cg : list) {
             String cgName = cg.getName().toString();
-            if(!cgName.equals("org.apache.hadoop.mapreduce.FileSystemCounter")
+            if (!cgName.equals("org.apache.hadoop.mapreduce.FileSystemCounter")
                 && !cgName.equals("org.apache.hadoop.mapreduce.TaskCounter")
                 && !cgName.equals("org.apache.hadoop.mapreduce.JobCounter")
                 && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter")
@@ -359,7 +552,9 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
                 && !cgName.equals("org.apache.hadoop.mapred.Task$Counter")                        // for artemis
                 && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter") // for artemis
                 && !cgName.equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter") // for artemis
-            ) continue;
+                ) {
+                continue;
+            }
 
             groups.put(cgName, new HashMap<String, Long>());
             Map<String, Long> counterValues = groups.get(cgName);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
index c289e4d..2ccbf8d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2Parser.java
@@ -19,8 +19,9 @@
 package org.apache.eagle.jpm.mr.history.parser;
 
 import org.apache.avro.Schema;
-import org.apache.avro.io.*;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.hadoop.mapreduce.jobhistory.Event;
 import org.slf4j.Logger;
@@ -33,35 +34,35 @@ import java.io.InputStream;
 public class JHFMRVer2Parser implements JHFParserBase {
     private static final Logger logger = LoggerFactory.getLogger(JHFMRVer2Parser.class);
     private JHFMRVer2EventReader _reader;
-    
-    public JHFMRVer2Parser(JHFMRVer2EventReader reader){
+
+    public JHFMRVer2Parser(JHFMRVer2EventReader reader) {
         this._reader = reader;
     }
 
-    @SuppressWarnings({ "rawtypes", "deprecation" })
+    @SuppressWarnings( {"rawtypes", "deprecation"})
     @Override
     public void parse(InputStream is) throws Exception {
         int eventCtr = 0;
         try {
-             long start = System.currentTimeMillis();
-             DataInputStream in = new DataInputStream(is);
-             String version = in.readLine();
-             if (!"Avro-Json".equals(version)) {
-                 throw new IOException("Incompatible event log version: " + version);
-             }
-                
-             Schema schema = Schema.parse(in.readLine());
-             SpecificDatumReader datumReader = new SpecificDatumReader(schema);
-             JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, in);
+            long start = System.currentTimeMillis();
+            DataInputStream in = new DataInputStream(is);
+            String version = in.readLine();
+            if (!"Avro-Json".equals(version)) {
+                throw new IOException("Incompatible event log version: " + version);
+            }
+
+            Schema schema = Schema.parse(in.readLine());
+            SpecificDatumReader datumReader = new SpecificDatumReader(schema);
+            JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, in);
 
-             Event wrapper;
-             while ((wrapper = getNextEvent(datumReader, decoder)) != null) {
-                 ++eventCtr;
-                 _reader.handleEvent(wrapper);
-             }
-             _reader.parseConfiguration();
-             // don't need put to finally as it's a kind of flushing data
-             _reader.close();
+            Event wrapper;
+            while ((wrapper = getNextEvent(datumReader, decoder)) != null) {
+                ++eventCtr;
+                _reader.handleEvent(wrapper);
+            }
+            _reader.parseConfiguration();
+            // don't need put to finally as it's a kind of flushing data
+            _reader.close();
             logger.info("reader used " + (System.currentTimeMillis() - start) + "ms");
         } catch (Exception ioe) {
             logger.error("Caught exception parsing history file after " + eventCtr + " events", ioe);
@@ -71,13 +72,13 @@ public class JHFMRVer2Parser implements JHFParserBase {
                 is.close();
             }
         }
-      }
-    
-    @SuppressWarnings({ "rawtypes", "unchecked" })
+    }
+
+    @SuppressWarnings( {"rawtypes", "unchecked"})
     public Event getNextEvent(DatumReader datumReader, JsonDecoder decoder) throws Exception {
         Event wrapper;
         try {
-            wrapper = (Event)datumReader.read(null, decoder);
+            wrapper = (Event) datumReader.read(null, decoder);
         } catch (java.io.EOFException e) {            // at EOF
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
index e4f437f..a2479ab 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserBase.java
@@ -21,13 +21,12 @@ package org.apache.eagle.jpm.mr.history.parser;
 import java.io.InputStream;
 
 /**
- *
  * @author yonzhang
- *
  */
 public interface JHFParserBase {
     /**
      * this method will ensure to close the inputstream
+     *
      * @param is
      * @throws Exception
      */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index dd640c2..718612d 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
@@ -30,7 +30,7 @@ public class JHFParserFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class);
 
-    public static JHFParserBase getParser(JHFConfigManager configManager, Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+    public static JHFParserBase getParser(MRHistoryJobConfig configManager, Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
         String format = configManager.getJobExtractorConfig().mrVersion;
         JHFParserBase parser;
         JHFFormat f;
@@ -40,31 +40,31 @@ public class JHFParserFactory {
             } else {
                 f = JHFFormat.valueOf(format);
             }
-        } catch(IllegalArgumentException ex) {
+        } catch (IllegalArgumentException ex) {
             f = JHFFormat.MRVer1; // fall back to version 1 unless it's specified as version 2
         }
-        
+
         switch (f) {
-        case MRVer2:
-            JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
-            reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
-            reader2.addListener(new TaskFailureListener(configManager));
-            reader2.addListener(new TaskAttemptCounterListener(configManager));
-            reader2.addListener(new JobConfigurationCreationServiceListener(configManager));
+            case MRVer2:
+                JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
+                reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
+                reader2.addListener(new TaskFailureListener(configManager));
+                reader2.addListener(new TaskAttemptCounterListener(configManager));
+                reader2.addListener(new JobConfigurationCreationServiceListener(configManager));
 
-            reader2.register(new JobEntityLifecycleAggregator());
-            parser = new JHFMRVer2Parser(reader2);
-            break;
-        case MRVer1:
-        default:
-            JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
-            reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
-            reader1.addListener(new TaskFailureListener(configManager));
-            reader1.addListener(new TaskAttemptCounterListener(configManager));
+                reader2.register(new JobEntityLifecycleAggregator());
+                parser = new JHFMRVer2Parser(reader2);
+                break;
+            case MRVer1:
+            default:
+                JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
+                reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
+                reader1.addListener(new TaskFailureListener(configManager));
+                reader1.addListener(new TaskAttemptCounterListener(configManager));
 
-            reader1.register(new JobEntityLifecycleAggregator());
-            parser = new JHFMRVer1Parser(reader1);
-            break;
+                reader1.register(new JobEntityLifecycleAggregator());
+                parser = new JHFMRVer1Parser(reader1);
+                break;
         }
         return parser;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
index 4529f8b..d5ed97a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFWriteNotCompletedException.java
@@ -21,16 +21,16 @@ package org.apache.eagle.jpm.mr.history.parser;
 /**
  * used to warn that one job history file has not yet completed writing to hdfs
  * This happens when feeder catches up and the history file has not been written into hdfs completely
- * @author yonzhang
  *
+ * @author yonzhang
  */
 public class JHFWriteNotCompletedException extends Exception {
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = -3060175780718218490L;
 
-    public JHFWriteNotCompletedException(String msg){
+    public JHFWriteNotCompletedException(String msg) {
         super(msg);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 0334c9b..4163f7b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.JobConfigurationAPIEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
@@ -32,10 +32,10 @@ import java.util.List;
 public class JobConfigurationCreationServiceListener implements HistoryJobEntityLifecycleListener {
     private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class);
     private static final int MAX_RETRY_TIMES = 3;
-    private JHFConfigManager configManager;
+    private MRHistoryJobConfig configManager;
     private JobConfigurationAPIEntity m_jobConfigurationEntity;
 
-    public JobConfigurationCreationServiceListener(JHFConfigManager configManager) {
+    public JobConfigurationCreationServiceListener(MRHistoryJobConfig configManager) {
         this.configManager = configManager;
     }
 
@@ -43,7 +43,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
         if (entity != null) {
             if (entity instanceof JobConfigurationAPIEntity) {
-                this.m_jobConfigurationEntity = (JobConfigurationAPIEntity)entity;
+                this.m_jobConfigurationEntity = (JobConfigurationAPIEntity) entity;
             }
         }
     }
@@ -55,13 +55,13 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
 
     @Override
     public void flush() throws Exception {
-        JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.eagleServiceHost,
-                eagleServiceConfig.eagleServicePort,
-                eagleServiceConfig.username,
-                eagleServiceConfig.password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
         List<JobConfigurationAPIEntity> list = new ArrayList<>();
@@ -76,7 +76,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
                 break;
             } catch (Exception ex) {
                 if (tried < MAX_RETRY_TIMES) {
-                    logger.error("Got exception to flush, retry as " + (tried + 1) +" times",ex);
+                    logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
                 } else {
                     logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
                 }
@@ -86,7 +86,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
                 client.getJerseyClient().destroy();
                 client.close();
             }
-            tried ++;
+            tried++;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index e1a3c69..abddf3b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
@@ -34,23 +34,24 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
     private static final int BATCH_SIZE = 1000;
     private int batchSize;
     private List<JobBaseAPIEntity> list = new ArrayList<>();
-    private JHFConfigManager configManager;
+    private MRHistoryJobConfig configManager;
     List<JobExecutionAPIEntity> jobs = new ArrayList<>();
     List<JobEventAPIEntity> jobEvents = new ArrayList<>();
     List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
     List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
-    
-    public JobEntityCreationEagleServiceListener(JHFConfigManager configManager){
+
+    public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) {
         this(configManager, BATCH_SIZE);
     }
-    
-    public JobEntityCreationEagleServiceListener(JHFConfigManager configManager, int batchSize) {
+
+    public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager, int batchSize) {
         this.configManager = configManager;
-        if (batchSize <= 0)
+        if (batchSize <= 0) {
             throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
+        }
         this.batchSize = batchSize;
     }
-    
+
     @Override
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
         list.add(entity);
@@ -65,26 +66,26 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
      */
     @Override
     public void flush() throws Exception {
-        JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.eagleServiceHost,
-                eagleServiceConfig.eagleServicePort,
-                eagleServiceConfig.username,
-                eagleServiceConfig.password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
         logger.info("start flushing entities of total number " + list.size());
         for (int i = 0; i < list.size(); i++) {
             JobBaseAPIEntity entity = list.get(i);
             if (entity instanceof JobExecutionAPIEntity) {
-                jobs.add((JobExecutionAPIEntity)entity);
-            } else if(entity instanceof JobEventAPIEntity) {
-                jobEvents.add((JobEventAPIEntity)entity);
-            } else if(entity instanceof TaskExecutionAPIEntity) {
-                taskExecs.add((TaskExecutionAPIEntity)entity);
-            } else if(entity instanceof TaskAttemptExecutionAPIEntity) {
-                taskAttemptExecs.add((TaskAttemptExecutionAPIEntity)entity);
+                jobs.add((JobExecutionAPIEntity) entity);
+            } else if (entity instanceof JobEventAPIEntity) {
+                jobEvents.add((JobEventAPIEntity) entity);
+            } else if (entity instanceof TaskExecutionAPIEntity) {
+                taskExecs.add((TaskExecutionAPIEntity) entity);
+            } else if (entity instanceof TaskAttemptExecutionAPIEntity) {
+                taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
             }
         }
         GenericServiceAPIResponseEntity result;
@@ -117,7 +118,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
         client.getJerseyClient().destroy();
         client.close();
     }
-    
+
     private void checkResult(GenericServiceAPIResponseEntity result) throws Exception {
         if (!result.isSuccess()) {
             logger.error(result.getException());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
index 6e0371c..8b85b26 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
@@ -24,21 +24,22 @@ import java.util.Vector;
 
 /**
  * not thread safe
- * @author yonzhang
  *
+ * @author yonzhang
  */
 public class JobEntityCreationPublisher {
     private Vector<HistoryJobEntityCreationListener> listeners = new Vector<>(2);
-    public void addListener(HistoryJobEntityCreationListener l){
+
+    public void addListener(HistoryJobEntityCreationListener l) {
         listeners.add(l);
     }
-    
+
     public void notifiyListeners(JobBaseAPIEntity entity) throws Exception {
         for (HistoryJobEntityCreationListener l : listeners) {
             l.jobEntityCreated(entity);
         }
     }
-    
+
     public void flush() throws Exception {
         for (HistoryJobEntityCreationListener l : listeners) {
             l.flush();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
index d97e2a3..594d8e2 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityLifecycleAggregator.java
@@ -53,10 +53,10 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
 
     @Override
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
-        if (entity != null ) {
+        if (entity != null) {
             if (entity instanceof TaskAttemptExecutionAPIEntity) {
                 taskAttemptEntityCreated((TaskAttemptExecutionAPIEntity) entity);
-            } else if(entity instanceof JobExecutionAPIEntity) {
+            } else if (entity instanceof JobExecutionAPIEntity) {
                 this.m_jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
             }
         }
@@ -74,24 +74,28 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
             JobCounters jobCounters = m_jobExecutionAPIEntity.getJobCounters();
 
             if (jobCounters == null) {
-                LOG.warn("no job counter found for "+this.m_jobExecutionAPIEntity);
+                LOG.warn("no job counter found for " + this.m_jobExecutionAPIEntity);
                 jobCounters = new JobCounters();
             }
 
-            Map<String,Map<String,Long>> counters = jobCounters.getCounters();
+            Map<String, Map<String, Long>> counters = jobCounters.getCounters();
 
-            Map<String,Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
-            if (mapTaskAttemptCounter == null) mapTaskAttemptCounter = new HashMap<>();
+            Map<String, Long> mapTaskAttemptCounter = this.m_mapTaskAttemptCounterAgg.result();
+            if (mapTaskAttemptCounter == null) {
+                mapTaskAttemptCounter = new HashMap<>();
+            }
             mapTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_mapAttemptDuration);
-            counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER,mapTaskAttemptCounter);
+            counters.put(Constants.MAP_TASK_ATTEMPT_COUNTER, mapTaskAttemptCounter);
 
-            Map<String,Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
-            if (reduceTaskAttemptCounter == null) reduceTaskAttemptCounter = new HashMap<>();
+            Map<String, Long> reduceTaskAttemptCounter = this.m_reduceTaskAttemptCounterAgg.result();
+            if (reduceTaskAttemptCounter == null) {
+                reduceTaskAttemptCounter = new HashMap<>();
+            }
             reduceTaskAttemptCounter.put(Constants.TaskAttemptCounter.TASK_ATTEMPT_DURATION.toString(), this.m_reduceAttemptDuration);
-            counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER,reduceTaskAttemptCounter);
+            counters.put(Constants.REDUCE_TASK_ATTEMPT_COUNTER, reduceTaskAttemptCounter);
 
             counters.put(Constants.MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_mapFileSystemCounterAgg.result());
-            counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER,this.m_reduceFileSystemTaskCounterAgg.result());
+            counters.put(Constants.REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER, this.m_reduceFileSystemTaskCounterAgg.result());
 
             jobCounters.setCounters(counters);
 
@@ -122,9 +126,9 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
 
         ObjectMapper objectMapper = new ObjectMapper();
         try {
-            LOG.warn("Unknown task type of task attempt execution entity: "+objectMapper.writeValueAsString(entity));
+            LOG.warn("Unknown task type of task attempt execution entity: " + objectMapper.writeValueAsString(entity));
         } catch (Exception e) {
-            LOG.error(e.getMessage(),e);
+            LOG.error(e.getMessage(), e);
         }
     }
 
@@ -137,13 +141,14 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
 
     interface JobCounterAggregateFunction {
         void accumulate(Map<String, Long> mapCounters);
-        Map<String,Long> result();
+
+        Map<String, Long> result();
     }
 
     static class JobCounterSumFunction implements JobCounterAggregateFunction {
         final Map<String, Long> result;
 
-        public JobCounterSumFunction(){
+        public JobCounterSumFunction() {
             result = new HashMap<>();
         }
 
@@ -153,7 +158,7 @@ public class JobEntityLifecycleAggregator implements HistoryJobEntityLifecycleLi
         @Override
         public void accumulate(Map<String, Long> counters) {
             if (counters != null) {
-                for (Map.Entry<String,Long> taskEntry: counters.entrySet()) {
+                for (Map.Entry<String, Long> taskEntry : counters.entrySet()) {
                     String counterName = taskEntry.getKey();
                     long counterValue = taskEntry.getValue();
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
index a2f4c3a..e65ec17 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/MRErrorClassifier.java
@@ -28,13 +28,13 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public final class MRErrorClassifier {
-    
+
     private final List<ErrorCategory> categories = new ArrayList<ErrorCategory>();
 
     public MRErrorClassifier(InputStream configStream) throws IOException {
         final BufferedReader reader = new BufferedReader(new InputStreamReader(configStream));
         String line;
-        while((line = reader.readLine()) != null) {
+        while ((line = reader.readLine()) != null) {
             line = line.trim();
             if (line.isEmpty() || line.startsWith("#")) {
                 continue;
@@ -58,7 +58,7 @@ public final class MRErrorClassifier {
             categories.add(category);
         }
     }
-    
+
     public List<ErrorCategory> getErrorCategories() {
         return categories;
     }
@@ -72,31 +72,36 @@ public final class MRErrorClassifier {
         }
         return "UNKNOWN";
     }
-    
+
     public static class ErrorCategory {
         private String name;
         private Pattern pattern;
         private boolean needTransform;
-        
+
         public String getName() {
             return name;
         }
+
         public void setName(String name) {
             this.name = name;
         }
+
         public Pattern getPattern() {
             return pattern;
         }
+
         public void setPattern(Pattern pattern) {
             this.pattern = pattern;
         }
+
         public boolean isNeedTransform() {
             return needTransform;
         }
+
         public void setNeedTransform(boolean needTransform) {
             this.needTransform = needTransform;
         }
-        
+
         public String classify(String error) {
             Matcher matcher = pattern.matcher(error);
             if (matcher.find()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
index d2ac944..be23051 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/RecordTypes.java
@@ -17,9 +17,10 @@
 */
 
 package org.apache.eagle.jpm.mr.history.parser;
+
 /**
- * Record types are identifiers for each line of log in history files. 
- * A record type appears as the first token in a single line of log. 
+ * Record types are identifiers for each line of log in history files.
+ * A record type appears as the first token in a single line of log.
  */
 public enum RecordTypes {
     Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index d819baf..efc43c5 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptCounterAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
@@ -34,46 +34,48 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
     private static final Logger logger = LoggerFactory.getLogger(TaskAttemptCounterListener.class);
     private static final int BATCH_SIZE = 1000;
     private Map<CounterKey, CounterValue> counters = new HashMap<>();
-    private JHFConfigManager configManager;
+    private MRHistoryJobConfig configManager;
 
-    public TaskAttemptCounterListener(JHFConfigManager configManager) {
+    public TaskAttemptCounterListener(MRHistoryJobConfig configManager) {
         this.configManager = configManager;
     }
 
     private static class CounterKey {
         Map<String, String> tags = new HashMap<>();
         long timestamp;
-        
+
         @Override
         public boolean equals(Object thatKey) {
-            if (!(thatKey instanceof CounterKey))
+            if (!(thatKey instanceof CounterKey)) {
                 return false;
-            CounterKey that = (CounterKey)thatKey;
-            if (that.tags.equals(this.tags) && that.timestamp == this.timestamp)
+            }
+            CounterKey that = (CounterKey) thatKey;
+            if (that.tags.equals(this.tags) && that.timestamp == this.timestamp) {
                 return true;
+            }
             return false;
         }
-        
+
         @Override
-        public int hashCode(){
+        public int hashCode() {
             return tags.hashCode() ^ Long.valueOf(timestamp).hashCode();
         }
     }
-    
+
     private static class CounterValue {
         int totalCount;
         int failedCount;
         int killedCount;
     }
-    
+
     @Override
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
         if (!(entity instanceof TaskAttemptExecutionAPIEntity)) {
             return;
         }
-        
-        TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
-        
+
+        TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity) entity;
+
         Map<String, String> tags = new HashMap<>();
         tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
         tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
@@ -85,21 +87,21 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
         CounterKey key = new CounterKey();
         key.tags = tags;
         key.timestamp = roundToMinute(e.getEndTime());
-        
+
         CounterValue value = counters.get(key);
         if (value == null) {
             value = new CounterValue();
             counters.put(key, value);
         }
-        
+
         if (e.getTaskStatus().equals(EagleTaskStatus.FAILED.name())) {
             value.failedCount++;
-        } else if(e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+        } else if (e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
             value.killedCount++;
         }
         value.totalCount++;
     }
-    
+
     private long roundToMinute(long timestamp) {
         GregorianCalendar cal = new GregorianCalendar();
         cal.setTimeInMillis(timestamp);
@@ -107,16 +109,16 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
         cal.set(Calendar.MILLISECOND, 0);
         return cal.getTimeInMillis();
     }
-    
+
     @Override
     public void flush() throws Exception {
-        JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-                eagleServiceConfig.eagleServiceHost,
-                eagleServiceConfig.eagleServicePort,
-                eagleServiceConfig.username,
-                eagleServiceConfig.password);
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
         List<TaskAttemptCounterAPIEntity> list = new ArrayList<>();
@@ -132,7 +134,7 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
             entity.setFailedCount(value.failedCount);
             entity.setKilledCount(value.killedCount);
             list.add(entity);
-            
+
             if (list.size() >= BATCH_SIZE) {
                 logger.info("start flushing TaskAttemptCounter " + list.size());
                 client.create(list);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0bde482b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 6e42fe2..e0c3c6b 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskFailureCountAPIEntity;
@@ -44,94 +44,98 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
 
     private final List<TaskFailureCountAPIEntity> failureTasks = new ArrayList<TaskFailureCountAPIEntity>();
     private final MRErrorClassifier classifier;
-	private JHFConfigManager configManager;
+    private MRHistoryJobConfig configManager;
 
-    public TaskFailureListener(JHFConfigManager configManager) {
+    public TaskFailureListener(MRHistoryJobConfig configManager) {
         this.configManager = configManager;
-    	InputStream is = null;
-    	try {
-    		is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
+        InputStream is = null;
+        try {
+            is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
             URL url = TaskFailureListener.class.getClassLoader().getResource(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
             if (url != null) {
                 logger.info("Feeder is going to load configuration file: " + url.toString());
             }
-    		classifier = new MRErrorClassifier(is);
-    	} catch (IOException ex) {
-    		throw new RuntimeException("Can't find MRErrorCategory.config file to configure MRErrorCategory");
-    	} finally {
-    		if (is != null) {
-    			try {
-    				is.close();
-    			} catch (IOException e) {
-    			}
-    		}
-    	}
+            classifier = new MRErrorClassifier(is);
+        } catch (IOException ex) {
+            throw new RuntimeException("Can't find MRErrorCategory.config file to configure MRErrorCategory");
+        } finally {
+            if (is != null) {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                }
+            }
+        }
     }
 
     @Override
     public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
-    	if (!(entity instanceof TaskAttemptExecutionAPIEntity))
-    		return;
+        if (!(entity instanceof TaskAttemptExecutionAPIEntity)) {
+            return;
+        }
 
-    	TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity)entity;
-    	// only store those killed or failed tasks
-    	if (!e.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) && !e.getTaskStatus().equals(EagleTaskStatus.KILLED.name()))
-    		return;
+        TaskAttemptExecutionAPIEntity e = (TaskAttemptExecutionAPIEntity) entity;
+        // only store those killed or failed tasks
+        if (!e.getTaskStatus().equals(EagleTaskStatus.FAILED.name()) && !e.getTaskStatus().equals(EagleTaskStatus.KILLED.name())) {
+            return;
+        }
 
-    	TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity();
-    	Map<String, String> tags = new HashMap<>();
-    	failureTask.setTags(tags);
-    	tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
-    	tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
-    	tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
-    	tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
-    	tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
-    	tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
-    	tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
+        TaskFailureCountAPIEntity failureTask = new TaskFailureCountAPIEntity();
+        Map<String, String> tags = new HashMap<>();
+        failureTask.setTags(tags);
+        tags.put(MRJobTagName.SITE.toString(), e.getTags().get(MRJobTagName.SITE.toString()));
+        tags.put(MRJobTagName.JOD_DEF_ID.toString(), e.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+        tags.put(MRJobTagName.RACK.toString(), e.getTags().get(MRJobTagName.RACK.toString()));
+        tags.put(MRJobTagName.HOSTNAME.toString(), e.getTags().get(MRJobTagName.HOSTNAME.toString()));
+        tags.put(MRJobTagName.JOB_ID.toString(), e.getTags().get(MRJobTagName.JOB_ID.toString()));
+        tags.put(MRJobTagName.TASK_ATTEMPT_ID.toString(), e.getTaskAttemptID());
+        tags.put(MRJobTagName.TASK_TYPE.toString(), e.getTags().get(MRJobTagName.TASK_TYPE.toString()));
 
-    	//TODO need optimize, match and then capture the data
-    	final String errCategory = classifier.classifyError(e.getError());
-    	tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
+        //TODO need optimize, match and then capture the data
+        final String errCategory = classifier.classifyError(e.getError());
+        tags.put(MRJobTagName.ERROR_CATEGORY.toString(), errCategory);
 
-    	failureTask.setError(e.getError());
-    	failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future
-    	failureTask.setTimestamp(e.getTimestamp());
-    	failureTask.setTaskStatus(e.getTaskStatus());
-    	failureTasks.add(failureTask);
+        failureTask.setError(e.getError());
+        failureTask.setFailureCount(1); // hard coded to 1 unless we do pre-aggregation in the future
+        failureTask.setTimestamp(e.getTimestamp());
+        failureTask.setTaskStatus(e.getTaskStatus());
+        failureTasks.add(failureTask);
 
-    	if (failureTasks.size() >= BATCH_SIZE) flush();
+        if (failureTasks.size() >= BATCH_SIZE) {
+            flush();
+        }
     }
-    
+
     @Override
     public void flush() throws Exception {
-		JHFConfigManager.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-		JHFConfigManager.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
-		IEagleServiceClient client = new EagleServiceClientImpl(
-				eagleServiceConfig.eagleServiceHost,
-				eagleServiceConfig.eagleServicePort,
-				eagleServiceConfig.username,
-				eagleServiceConfig.password);
+        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
+        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
+        IEagleServiceClient client = new EagleServiceClientImpl(
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
 
-		client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
 
-    	int tried = 0;
-    	while (tried <= MAX_RETRY_TIMES) {
-    		try {
-    			logger.info("start flushing entities of total number " + failureTasks.size());
-    			client.create(failureTasks);
-    			logger.info("finish flushing entities of total number " + failureTasks.size());
-    			failureTasks.clear();
-				break;
-    		} catch (Exception ex) {
-    			if (tried < MAX_RETRY_TIMES) {
-    				logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
-    			} else {
-    				logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
-    				throw ex;
-    			}
-    		}
-			tried ++;
-    	}
+        int tried = 0;
+        while (tried <= MAX_RETRY_TIMES) {
+            try {
+                logger.info("start flushing entities of total number " + failureTasks.size());
+                client.create(failureTasks);
+                logger.info("finish flushing entities of total number " + failureTasks.size());
+                failureTasks.clear();
+                break;
+            } catch (Exception ex) {
+                if (tried < MAX_RETRY_TIMES) {
+                    logger.error("Got exception to flush, retry as " + (tried + 1) + " times", ex);
+                } else {
+                    logger.error("Got exception to flush, reach max retry times " + MAX_RETRY_TIMES, ex);
+                    throw ex;
+                }
+            }
+            tried++;
+        }
         client.getJerseyClient().destroy();
         client.close();
     }