You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/18 07:43:56 UTC

incubator-eagle git commit: [EAGLE-632] clean up configuration for MR history feeder

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 3d6a29ec2 -> a781937e9


[EAGLE-632] clean up configuration for MR history feeder

Author: wujinhu <wu...@126.com>

Closes #522 from wujinhu/EAGLE-632.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a781937e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a781937e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a781937e

Branch: refs/heads/master
Commit: a781937e978025b708ccc70d47ddaadc18668948
Parents: 3d6a29e
Author: wujinhu <wu...@126.com>
Authored: Tue Oct 18 15:43:44 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Oct 18 15:43:44 2016 +0800

----------------------------------------------------------------------
 .../jpm/mr/history/MRHistoryJobApplication.java |  14 +-
 .../jpm/mr/history/MRHistoryJobConfig.java      |  62 +---
 .../crawler/DefaultJHFInputStreamCallback.java  |   2 +-
 .../history/crawler/JHFCrawlerDriverImpl.java   |   6 +-
 .../metrics/JobCountMetricsGenerator.java       |   2 +-
 .../mr/history/parser/JHFEventReaderBase.java   |  20 +-
 .../mr/history/parser/JHFMRVer1EventReader.java | 152 ----------
 .../jpm/mr/history/parser/JHFMRVer1Parser.java  | 283 -------------------
 .../parser/JHFMRVer1PerLineListener.java        |  39 ---
 .../mr/history/parser/JHFMRVer2EventReader.java |   1 -
 .../jpm/mr/history/parser/JHFParserFactory.java |  44 +--
 ...JobConfigurationCreationServiceListener.java |   2 +-
 .../JobEntityCreationEagleServiceListener.java  |   4 +-
 .../parser/TaskAttemptCounterListener.java      |   2 +-
 .../mr/history/parser/TaskFailureListener.java  |   2 +-
 .../jpm/mr/history/storm/JobHistorySpout.java   |  15 +-
 ....history.MRHistoryJobApplicationProvider.xml | 102 ++++---
 .../src/main/resources/application.conf         |  44 +--
 18 files changed, 131 insertions(+), 665 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index beec938..de35678 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -59,16 +59,12 @@ public class MRHistoryJobApplication extends StormApplication {
         JobHistoryContentFilter filter = builder.build();
         //3. init topology
         TopologyBuilder topologyBuilder = new TopologyBuilder();
-        String spoutName = "mrHistoryJobExecutor";
-        int parallelism = jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName);
-        int tasks = jhfAppConf.getInt("envContextConfig.tasks." + spoutName);
-        if (parallelism > tasks) {
-            parallelism = tasks;
-        }
+        String spoutName = "mrHistoryJobSpout";
+        int tasks = jhfAppConf.getInt("stormConfig.mrHistoryJobSpoutTasks");
         topologyBuilder.setSpout(
-            spoutName,
-            new JobHistorySpout(filter, config),
-            parallelism
+                spoutName,
+                new JobHistorySpout(filter, config),
+                tasks
         ).setNumTasks(tasks);
         return topologyBuilder.createTopology();
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
index 37df5ad..561c3d0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -49,18 +49,6 @@ public class MRHistoryJobConfig implements Serializable {
 
     private JobHistoryEndpointConfig jobHistoryEndpointConfig;
 
-    public ControlConfig getControlConfig() {
-        return controlConfig;
-    }
-
-    private ControlConfig controlConfig;
-
-    public JobExtractorConfig getJobExtractorConfig() {
-        return jobExtractorConfig;
-    }
-
-    private JobExtractorConfig jobExtractorConfig;
-
     public EagleServiceConfig getEagleServiceConfig() {
         return eagleServiceConfig;
     }
@@ -79,25 +67,14 @@ public class MRHistoryJobConfig implements Serializable {
         public int zkSessionTimeoutMs;
         public int zkRetryTimes;
         public int zkRetryInterval;
-        public String zkPort;
     }
 
     public static class JobHistoryEndpointConfig implements Serializable {
         public String mrHistoryServerUrl;
         public String basePath;
         public Map<String, String> hdfs;
-    }
-
-    public static class ControlConfig implements Serializable {
-        public Class<? extends JobIdPartitioner> partitionerCls;
-        public boolean zeroBasedMonth;
         public String timeZone;
-    }
-
-    public static class JobExtractorConfig implements Serializable {
         public String site;
-        public String mrVersion;
-        public int readTimeoutSeconds;
     }
 
     public static class EagleServiceConfig implements Serializable {
@@ -105,6 +82,7 @@ public class MRHistoryJobConfig implements Serializable {
         public int eagleServicePort;
         public String username;
         public String password;
+        public int readTimeoutSeconds;
     }
 
     private static MRHistoryJobConfig manager = new MRHistoryJobConfig();
@@ -118,8 +96,6 @@ public class MRHistoryJobConfig implements Serializable {
         this.zkStateConfig = new ZKStateConfig();
         this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig();
         this.jobHistoryEndpointConfig.hdfs = new HashMap<>();
-        this.controlConfig = new ControlConfig();
-        this.jobExtractorConfig = new JobExtractorConfig();
         this.eagleServiceConfig = new EagleServiceConfig();
         this.config = null;
     }
@@ -141,48 +117,34 @@ public class MRHistoryJobConfig implements Serializable {
      */
     private void init(Config config) {
         this.config = config;
-        //parse eagle job extractor
-        this.jobExtractorConfig.site = config.getString("jobExtractorConfig.site");
-        this.jobExtractorConfig.mrVersion = config.getString("jobExtractorConfig.mrVersion");
-        this.jobExtractorConfig.readTimeoutSeconds = config.getInt("jobExtractorConfig.readTimeOutSeconds");
+
         //parse eagle zk
         this.zkStateConfig.zkQuorum = config.getString("zkStateConfig.zkQuorum");
-        this.zkStateConfig.zkPort = config.getString("zkStateConfig.zkPort");
         this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zkStateConfig.zkSessionTimeoutMs");
         this.zkStateConfig.zkRetryTimes = config.getInt("zkStateConfig.zkRetryTimes");
         this.zkStateConfig.zkRetryInterval = config.getInt("zkStateConfig.zkRetryInterval");
         this.zkStateConfig.zkRoot = config.getString("zkStateConfig.zkRoot");
 
         //parse job history endpoint
+        this.jobHistoryEndpointConfig.site = config.getString("siteId");
         this.jobHistoryEndpointConfig.basePath = config.getString("endpointConfig.basePath");
         this.jobHistoryEndpointConfig.mrHistoryServerUrl = config.getString("endpointConfig.mrHistoryServerUrl");
         for (Map.Entry<String, ConfigValue> entry : config.getConfig("endpointConfig.hdfs").entrySet()) {
             this.jobHistoryEndpointConfig.hdfs.put(entry.getKey(), entry.getValue().unwrapped().toString());
         }
-        //parse control config
-        try {
-            this.controlConfig.partitionerCls = (Class<? extends JobIdPartitioner>) Class.forName(config.getString("controlConfig.partitionerCls"));
-            assert this.controlConfig.partitionerCls != null;
-        } catch (Exception e) {
-            LOG.warn("can not initialize partitioner class, use org.apache.eagle.jpm.util.DefaultJobIdPartitioner", e);
-            this.controlConfig.partitionerCls = DefaultJobIdPartitioner.class;
-        } finally {
-            LOG.info("Loaded partitioner class: {}", this.controlConfig.partitionerCls);
-        }
-        this.controlConfig.zeroBasedMonth = config.getBoolean("controlConfig.zeroBasedMonth");
-        this.controlConfig.timeZone = config.getString("controlConfig.timeZone");
+        this.jobHistoryEndpointConfig.timeZone = config.getString("endpointConfig.timeZone");
 
         // parse eagle service endpoint
-        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleProps.eagleService.host");
-        String port = config.getString("eagleProps.eagleService.port");
+        this.eagleServiceConfig.eagleServiceHost = config.getString("eagleService.host");
+        String port = config.getString("eagleService.port");
         this.eagleServiceConfig.eagleServicePort = (port == null ? 8080 : Integer.parseInt(port));
-        this.eagleServiceConfig.username = config.getString("eagleProps.eagleService.username");
-        this.eagleServiceConfig.password = config.getString("eagleProps.eagleService.password");
+        this.eagleServiceConfig.username = config.getString("eagleService.username");
+        this.eagleServiceConfig.password = config.getString("eagleService.password");
+        this.eagleServiceConfig.readTimeoutSeconds = config.getInt("eagleService.readTimeOutSeconds");
 
         LOG.info("Successfully initialized MRHistoryJobConfig");
-        LOG.info("zookeeper.quorum: " + this.zkStateConfig.zkQuorum);
-        LOG.info("zookeeper.property.clientPort: " + this.zkStateConfig.zkPort);
-        LOG.info("eagle.service.host: " + this.eagleServiceConfig.eagleServiceHost);
-        LOG.info("eagle.service.port: " + this.eagleServiceConfig.eagleServicePort);
+        LOG.info("zkStateConfig.zkQuorum: " + this.zkStateConfig.zkQuorum);
+        LOG.info("eagleService.host: " + this.eagleServiceConfig.eagleServiceHost);
+        LOG.info("eagleService.port: " + this.eagleServiceConfig.eagleServicePort);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index 14b93af..b491857 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -43,7 +43,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site);
+                put("site", MRHistoryJobConfig.get().getJobHistoryEndpointConfig().site);
             }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index a33abeb..ef38b43 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -53,7 +53,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
 
     private final JobProcessDate processDate = new JobProcessDate();
     private JHFInputStreamCallback reader;
-    protected boolean zeroBasedMonth = true;
 
     private JobHistoryLCM jhfLCM;
     private JobIdFilter jobFilter;
@@ -63,12 +62,11 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
 
     public JHFCrawlerDriverImpl(JHFInputStreamCallback reader,
                                 JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception {
-        this.zeroBasedMonth = MRHistoryJobConfig.get().getControlConfig().zeroBasedMonth;
         this.reader = reader;
         jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig);
         this.partitionId = partitionId;
         this.jobFilter = jobFilter;
-        timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getControlConfig().timeZone);
+        timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getJobHistoryEndpointConfig().timeZone);
         jobCountMetricsGenerator = new JobCountMetricsGenerator(timeZone);
     }
 
@@ -198,7 +196,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
     }
 
     private int getActualMonth(int month) {
-        return zeroBasedMonth ? processDate.month : processDate.month + 1;
+        return processDate.month + 1;
     }
 
     private static class JobProcessDate {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
index ac4a33d..f74f77a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
@@ -88,7 +88,7 @@ public class JobCountMetricsGenerator {
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site);
+                put("site", MRHistoryJobConfig.get().getJobHistoryEndpointConfig().site);
                 put(MRJobTagName.JOB_STATUS.toString(), state);
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index d452f59..8b35a56 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -18,12 +18,9 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.metrics.JobCounterMetricsGenerator;
-import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.JobNameNormalization;
@@ -534,4 +531,21 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         }
         return false;
     }
+
+    public enum Keys {
+        JOBTRACKERID,
+        START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
+        LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
+        FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
+        ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
+        SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
+        TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
+        VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK,
+
+        UBERISED, SPLIT_LOCATIONS, FAILED_DUE_TO_ATTEMPT, MAP_FINISH_TIME, PORT, RACK_NAME,
+
+        //For Artemis
+        WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES, WORKFLOW_TAGS,
+        SHUFFLE_PORT, LOCALITY, AVATAAR, FAIL_REASON
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
deleted file mode 100644
index e20836f..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.parser;
-
-import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
-import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
-import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
-import org.apache.eagle.jpm.util.jobcounter.JobCounters;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.CounterGroup;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.util.CountersStrings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * Listener holds all informations related to one whole job history file, so it's stateful and does not support multithreading.
- */
-public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer1PerLineListener {
-    private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1EventReader.class);
-
-    /**
-     * baseTags stores the basic tag name values which might be used for persisting various entities.
-     * baseTags includes: cluster, datacenter and jobName
-     * baseTags are used for all job/task related entities
-     *
-     * @param baseTags
-     */
-    public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
-        super(baseTags, configuration, filter);
-    }
-
-    @Override
-    public void handle(RecordTypes recType, Map<Keys, String> values)
-        throws Exception {
-        switch (recType) {
-            case Job:
-                handleJob(null, values, values.get(Keys.COUNTERS));
-                break;
-            case Task:
-                handleTask(RecordTypes.Task, null, values, values.get(Keys.COUNTERS));
-                break;
-            case MapAttempt:
-                ensureRackHostnameAfterAttemptFinish(values);
-                handleTask(RecordTypes.MapAttempt, null, values, values.get(Keys.COUNTERS));
-                break;
-            case ReduceAttempt:
-                ensureRackHostnameAfterAttemptFinish(values);
-                handleTask(RecordTypes.ReduceAttempt, null, values, values.get(Keys.COUNTERS));
-                break;
-            default:
-                // skip other types
-                ;
-        }
-    }
-
-    private void ensureRackHostnameAfterAttemptFinish(Map<Keys, String> values) {
-        // only care about attempt finish
-        if (values.get(Keys.FINISH_TIME) == null) {
-            return;
-        }
-        String hostname = null;
-        String rack = null;
-        // we get rack/hostname based on task's status
-        if (values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.SUCCESS.name())) {
-            // in CDH4.1.4, the hostname has the format of /default/rack128/some.server.address
-            // if not specified, the hostname has the format  of /default-rack/<hostname>
-            String decoratedHostname = values.get(Keys.HOSTNAME);
-            String[] tmp = decoratedHostname.split("/");
-            hostname = tmp[tmp.length - 1];
-            rack = tmp[tmp.length - 2];
-            host2RackMapping.put(hostname, rack);
-        } else if (values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.KILLED.name()) || values.get(Keys.TASK_STATUS).equals(EagleTaskStatus.FAILED.name())) {
-            hostname = values.get(Keys.HOSTNAME);
-            // make every effort to get RACK information
-            hostname = (hostname == null) ? "" : hostname;
-            rack = host2RackMapping.get(hostname);
-        }
-
-        values.put(Keys.HOSTNAME, hostname);
-        values.put(Keys.RACK, rack);
-    }
-
-    @Override
-    protected JobCounters parseCounters(Object value) throws IOException {
-        JobCounters jc = new JobCounters();
-        Map<String, Map<String, Long>> groups = new HashMap<String, Map<String, Long>>();
-        Counters counters = new Counters();
-        try {
-            CountersStrings.parseEscapedCompactString((String) value, counters);
-        } catch (Exception ex) {
-            logger.error("can not parse job history", ex);
-            throw new IOException(ex);
-        }
-        Iterator<CounterGroup> it = counters.iterator();
-        while (it.hasNext()) {
-            CounterGroup cg = it.next();
-
-            // hardcoded to exclude business level counters
-            if (!cg.getName().equals("org.apache.hadoop.mapreduce.FileSystemCounter")
-                && !cg.getName().equals("org.apache.hadoop.mapreduce.TaskCounter")
-                && !cg.getName().equals("org.apache.hadoop.mapreduce.JobCounter")
-                && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter")
-                && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter")
-                && !cg.getName().equals("FileSystemCounters")                                      // for artemis
-                && !cg.getName().equals("org.apache.hadoop.mapred.Task$Counter")                   // for artemis
-                && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileInputFormat$Counter")  // for artemis
-                && !cg.getName().equals("org.apache.hadoop.mapreduce.lib.input.FileOutputFormat$Counter")
-                ) {
-                continue;
-            }
-
-            groups.put(cg.getName(), new HashMap<String, Long>());
-            Map<String, Long> counterValues = groups.get(cg.getName());
-            logger.debug("groupname:" + cg.getName() + "(" + cg.getDisplayName() + ")");
-            Iterator<Counter> iterCounter = cg.iterator();
-            while (iterCounter.hasNext()) {
-                Counter c = iterCounter.next();
-                counterValues.put(c.getName(), c.getValue());
-                logger.debug(c.getName() + "=" + c.getValue() + "(" + c.getDisplayName() + ")");
-            }
-        }
-        jc.setCounters(groups);
-        return jc;
-    }
-
-    public JobExecutionAPIEntity jobExecution() {
-        return jobExecutionEntity;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
deleted file mode 100644
index 2f63703..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1Parser.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.parser;
-
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.text.ParseException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
-public class JHFMRVer1Parser implements JHFParserBase {
-    private static final Logger logger = LoggerFactory.getLogger(JHFMRVer1Parser.class);
-    static final char LINE_DELIMITER_CHAR = '.';
-    static final char[] charsToEscape = new char[] {'"', '=', LINE_DELIMITER_CHAR};
-    static final String KEY = "(\\w+)";
-    // value is any character other than quote, but escaped quotes can be there
-    static final String VALUE = "[^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+";
-    static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
-    static final String MAX_COUNTER_COUNT = "10000";
-
-    private JHFMRVer1EventReader reader;
-
-    public JHFMRVer1Parser(JHFMRVer1EventReader reader) {
-        this.reader = reader;
-    }
-
-    /**
-     * Parses history file and invokes Listener.handle() for
-     * each line of history. It can be used for looking through history
-     * files for specific items without having to keep whole history in memory.
-     *
-     * @throws IOException
-     */
-    @Override
-    public void parse(InputStream in) throws Exception, ParseException {
-        // set enough counter number as user may build more counters
-        System.setProperty("mapreduce.job.counters.max", MAX_COUNTER_COUNT);
-        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-        try {
-            String line = null;
-            StringBuffer buf = new StringBuffer();
-
-            // Read the meta-info line. Note that this might be a jobinfo line for files written with older format
-            line = reader.readLine();
-
-            // Check if the file is empty
-            if (line == null) {
-                return;
-            }
-
-            // Get the information required for further processing
-            MetaInfoManager mgr = new MetaInfoManager(line);
-            boolean isEscaped = mgr.isValueEscaped();
-            String lineDelim = String.valueOf(mgr.getLineDelim());
-            String escapedLineDelim = StringUtils.escapeString(lineDelim, StringUtils.ESCAPE_CHAR, mgr.getLineDelim());
-
-            do {
-                buf.append(line);
-                if (!line.trim().endsWith(lineDelim) || line.trim().endsWith(escapedLineDelim)) {
-                    buf.append("\n");
-                    continue;
-                }
-                parseLine(buf.toString(), this.reader, isEscaped);
-                buf = new StringBuffer();
-            }
-            while ((line = reader.readLine()) != null);
-
-            // flush to tell listener that we have finished parsing
-            logger.info("finish parsing job history file and close");
-            this.reader.close();
-        } catch (Exception ex) {
-            logger.error("can not parse correctly ", ex);
-            throw ex;
-        } finally {
-            if (reader != null) {
-                reader.close();
-            }
-        }
-    }
-
-    private static void parseLine(String line, JHFMRVer1PerLineListener l, boolean isEscaped) throws Exception, ParseException {
-        // extract the record type
-        int idx = line.indexOf(' ');
-        String recType = line.substring(0, idx);
-        String data = line.substring(idx + 1, line.length());
-
-        Matcher matcher = pattern.matcher(data);
-        Map<Keys, String> parseBuffer = new HashMap<Keys, String>();
-
-        while (matcher.find()) {
-            String tuple = matcher.group(0);
-            String[] parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
-            String value = parts[1].substring(1, parts[1].length() - 1);
-            if (isEscaped) {
-                value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR, charsToEscape);
-            }
-            parseBuffer.put(Keys.valueOf(parts[0]), value);
-        }
-
-        try {
-            l.handle(RecordTypes.valueOf(recType), parseBuffer);
-        } catch (IllegalArgumentException ex) {
-            // somehow record type does not exist, but continue to run
-            logger.warn("record type does not exist " + recType, ex);
-        }
-
-        parseBuffer.clear();
-    }
-
-    /**
-     * Manages job-history's meta information such as version etc.
-     * Helps in logging version information to the job-history and recover
-     * version information from the history.
-     */
-    static class MetaInfoManager implements JHFMRVer1PerLineListener {
-        private long version = 0L;
-        private KeyValuePair pairs = new KeyValuePair();
-
-        public void close() {
-        }
-
-        // Extract the version of the history that was used to write the history
-        public MetaInfoManager(String line) throws Exception, ParseException {
-            if (null != line) {
-                // Parse the line
-                parseLine(line, this, false);
-            }
-        }
-
-        // Get the line delimiter
-        char getLineDelim() {
-            if (version == 0) {
-                return '"';
-            } else {
-                return LINE_DELIMITER_CHAR;
-            }
-        }
-
-        // Checks if the values are escaped or not
-        boolean isValueEscaped() {
-            // Note that the values are not escaped in version 0
-            return version != 0;
-        }
-
-        public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException {
-            // Check if the record is of type META
-            if (RecordTypes.Meta == recType) {
-                pairs.handle(values);
-                version = pairs.getLong(Keys.VERSION); // defaults to 0
-            }
-        }
-    }
-
-    /**
-     * Base class contais utility stuff to manage types key value pairs with enums.
-     */
-    static class KeyValuePair {
-        private Map<Keys, String> values = new HashMap<Keys, String>();
-
-        /**
-         * Get 'String' value for given key. Most of the places use Strings as
-         * values so the default get' method returns 'String'.  This method never returns
-         * null to ease on GUIs. if no value is found it returns empty string ""
-         *
-         * @param k
-         * @return if null it returns empty string - ""
-         */
-        public String get(Keys k) {
-            String s = values.get(k);
-            return s == null ? "" : s;
-        }
-
-        /**
-         * Convert value from history to int and return.
-         * if no value is found it returns 0.
-         *
-         * @param k key
-         */
-        public int getInt(Keys k) {
-            String s = values.get(k);
-            if (null != s) {
-                return Integer.parseInt(s);
-            }
-            return 0;
-        }
-
-        /**
-         * Convert value from history to int and return.
-         * if no value is found it returns 0.
-         *
-         * @param k
-         */
-        public long getLong(Keys k) {
-            String s = values.get(k);
-            if (null != s) {
-                return Long.parseLong(s);
-            }
-            return 0;
-        }
-
-        /**
-         * Set value for the key.
-         *
-         * @param k
-         * @param s
-         */
-        public void set(Keys k, String s) {
-            values.put(k, s);
-        }
-
-        /**
-         * Adds all values in the Map argument to its own values.
-         *
-         * @param m
-         */
-        public void set(Map<Keys, String> m) {
-            values.putAll(m);
-        }
-
-        /**
-         * Reads values back from the history, input is same Map as passed to Listener by parseHistory().
-         *
-         * @param values
-         */
-        public synchronized void handle(Map<Keys, String> values) {
-            set(values);
-        }
-
-        /**
-         * Returns Map containing all key-values.
-         */
-        public Map<Keys, String> getValues() {
-            return values;
-        }
-    }
-
-    /**
-     * Job history files contain key="value" pairs, where keys belong to this enum.
-     * It acts as a global namespace for all keys.
-     */
-    public static enum Keys {
-        JOBTRACKERID,
-        START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
-        LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES,
-        FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
-        ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
-        SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
-        TRACKER_NAME, STATE_STRING, VERSION, MAP_COUNTERS, REDUCE_COUNTERS,
-        VIEW_JOB, MODIFY_JOB, JOB_QUEUE, RACK,
-
-        UBERISED, SPLIT_LOCATIONS, FAILED_DUE_TO_ATTEMPT, MAP_FINISH_TIME, PORT, RACK_NAME,
-
-        //For Artemis
-        WORKFLOW_ID, WORKFLOW_NAME, WORKFLOW_NODE_NAME, WORKFLOW_ADJACENCIES, WORKFLOW_TAGS,
-        SHUFFLE_PORT, LOCALITY, AVATAAR, FAIL_REASON
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
deleted file mode 100644
index 5d48d5d..0000000
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1PerLineListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.parser;
-
-import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Callback interface for reading back log events from JobHistory. This interface
- * should be implemented and passed to JobHistory.parseHistory()
- */
-public interface JHFMRVer1PerLineListener {
-    /**
-     * Callback method for history parser.
-     *
-     * @param recType type of record, which is the first entry in the line.
-     * @param values  a map of key-value pairs as thry appear in history.
-     * @throws IOException
-     */
-    void handle(RecordTypes recType, Map<Keys, String> values) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 0919aa0..5324c02 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -19,7 +19,6 @@
 package org.apache.eagle.jpm.mr.history.parser;
 
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
-import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index 56fd956..fee9394 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
@@ -31,41 +30,14 @@ public class JHFParserFactory {
     private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class);
 
     public static JHFParserBase getParser(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
-        String format = MRHistoryJobConfig.get().getJobExtractorConfig().mrVersion;
-        JHFParserBase parser;
-        JHFFormat f;
-        try {
-            if (format == null) {
-                f = JHFFormat.MRVer1;
-            } else {
-                f = JHFFormat.valueOf(format);
-            }
-        } catch (IllegalArgumentException ex) {
-            f = JHFFormat.MRVer1; // fall back to version 1 unless it's specified as version 2
-        }
-
-        switch (f) {
-            case MRVer2:
-                JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
-                reader2.addListener(new JobEntityCreationEagleServiceListener());
-                reader2.addListener(new TaskFailureListener());
-                reader2.addListener(new TaskAttemptCounterListener());
-                reader2.addListener(new JobConfigurationCreationServiceListener());
-
-                reader2.register(new JobEntityLifecycleAggregator());
-                parser = new JHFMRVer2Parser(reader2);
-                break;
-            case MRVer1:
-            default:
-                JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
-                reader1.addListener(new JobEntityCreationEagleServiceListener());
-                reader1.addListener(new TaskFailureListener());
-                reader1.addListener(new TaskAttemptCounterListener());
-
-                reader1.register(new JobEntityLifecycleAggregator());
-                parser = new JHFMRVer1Parser(reader1);
-                break;
-        }
+        JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
+        reader2.addListener(new JobEntityCreationEagleServiceListener());
+        reader2.addListener(new TaskFailureListener());
+        reader2.addListener(new TaskAttemptCounterListener());
+        reader2.addListener(new JobConfigurationCreationServiceListener());
+
+        reader2.register(new JobEntityLifecycleAggregator());
+        JHFParserBase parser = new JHFMRVer2Parser(reader2);
         return parser;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index bf93432..28020b0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -59,7 +59,7 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
             MRHistoryJobConfig.get().getEagleServiceConfig().username,
             MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000);
         List<JobConfigurationAPIEntity> list = new ArrayList<>();
         list.add(jobConfigurationEntity);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 74368a5..67cfb71 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -56,7 +56,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
         }
         this.batchSize = batchSize;
-        timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getControlConfig().timeZone);
+        timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getJobHistoryEndpointConfig().timeZone);
     }
 
     @Override
@@ -88,7 +88,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             MRHistoryJobConfig.get().getEagleServiceConfig().username,
             MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000);
         logger.info("start flushing entities of total number " + list.size());
         List<GenericMetricEntity> metricEntities = new ArrayList<>();
         for (int i = 0; i < list.size(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index ef7c8e9..e663b29 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -116,7 +116,7 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
             MRHistoryJobConfig.get().getEagleServiceConfig().username,
             MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000);
         List<TaskAttemptCounterAPIEntity> list = new ArrayList<>();
         logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size());
         // create entity

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 0c18133..ef62a6c 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -114,7 +114,7 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
             MRHistoryJobConfig.get().getEagleServiceConfig().username,
             MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000);
 
         int tried = 0;
         while (tried <= MAX_RETRY_TIMES) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 8ad3284..88cd100 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -22,6 +22,7 @@ import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.*;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity;
+import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
 import org.apache.eagle.jpm.util.JobIdFilter;
 import org.apache.eagle.jpm.util.JobIdFilterByPartition;
 import org.apache.eagle.jpm.util.JobIdPartitioner;
@@ -134,15 +135,7 @@ public class JobHistorySpout extends BaseRichSpout {
             throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId "
                 + partitionId + " and numTotalPartitions " + numTotalPartitions);
         }
-        Class<? extends JobIdPartitioner> partitionerCls = MRHistoryJobConfig.get().getControlConfig().partitionerCls;
-        JobIdPartitioner partitioner;
-        try {
-            partitioner = partitionerCls.newInstance();
-        } catch (Exception e) {
-            LOG.error("failing instantiating job partitioner class " + partitionerCls, e);
-            throw new IllegalStateException(e);
-        }
-        JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId);
+        JobIdFilter jobIdFilter = new JobIdFilterByPartition(new DefaultJobIdPartitioner(), numTotalPartitions, partitionId);
         JobHistoryZKStateManager.instance().init(MRHistoryJobConfig.get().getZkStateConfig());
         JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions);
         interceptor.setSpoutOutputCollector(collector);
@@ -232,7 +225,7 @@ public class JobHistorySpout extends BaseRichSpout {
         LOG.info("update process time stamp {}", minTimeStamp);
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site);
+                put("site", MRHistoryJobConfig.get().getJobHistoryEndpointConfig().site);
             }
         };
         JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
@@ -246,7 +239,7 @@ public class JobHistorySpout extends BaseRichSpout {
             MRHistoryJobConfig.get().getEagleServiceConfig().username,
             MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getEagleServiceConfig().readTimeoutSeconds * 1000);
 
         List<JobProcessTimeStampEntity> entities = new ArrayList<>();
         entities.add(entity);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index 66111eb..9835411 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -23,81 +23,87 @@
     <configuration>
         <!-- org.apache.eagle.jpm.mr.history.MRHistoryJobConfig -->
         <property>
-            <name>jobExtractorConfig.site</name>
-            <displayName>Site ID</displayName>
-            <value>sandbox</value>
-        </property>
-        <property>
             <name>workers</name>
-            <displayName>storm worker number</displayName>
-            <value>4</value>
-        </property>
-        <property>
-            <name>envContextConfig.parallelismConfig.mrHistoryJobExecutor</name>
+            <displayName>Worker Number</displayName>
+            <description>the number of storm workers will be used</description>
             <value>4</value>
         </property>
         <property>
-            <name>envContextConfig.tasks.mrHistoryJobExecutor</name>
+            <name>stormConfig.mrHistoryJobSpoutTasks</name>
+            <displayName>Read Task Number</displayName>
+            <description>the number tasks of the spout will be assigned</description>
             <value>4</value>
         </property>
-        <property>
-            <name>jobExtractorConfig.mrVersion</name>
-            <value>MRVer2</value>
-        </property>
-        <property>
-            <name>jobExtractorConfig.readTimeOutSeconds</name>
-            <value>60</value>
-        </property>
+
         <property>
             <name>zkStateConfig.zkQuorum</name>
+            <displayName>Zookeeper Servers</displayName>
+            <description>list of zookeeper servers to connect to</description>
             <value>sandbox.hortonworks.com:2181</value>
-        </property>
-        <property>
-            <name>zkStateConfig.zkPort</name>
-            <value>2181</value>
+            <required>true</required>
         </property>
         <property>
             <name>zkStateConfig.zkRoot</name>
+            <displayName>Zookeeper Root Path</displayName>
+            <description>zkRoot that used to save context for this application</description>
             <value>/mrjobhistory_sandbox</value>
+            <required>true</required>
         </property>
         <property>
             <name>zkStateConfig.zkSessionTimeoutMs</name>
+            <displayName>Zookeeper Session Timeout</displayName>
+            <description>session timeout</description>
             <value>15000</value>
+            <required>true</required>
         </property>
         <property>
             <name>zkStateConfig.zkRetryTimes</name>
+            <displayName>Zookeeper Retry Times</displayName>
+            <description>retry policy to use - retry times</description>
             <value>3</value>
+            <required>true</required>
         </property>
         <property>
             <name>zkStateConfig.zkRetryInterval</name>
+            <displayName>Zookeeper Retry Interval</displayName>
+            <description>retry policy to use - retry interval</description>
             <value>20000</value>
+            <required>true</required>
         </property>
+
         <property>
-            <name>controlConfig.zeroBasedMonth</name>
-            <value>false</value>
-        </property>
-        <property>
-            <name>controlConfig.partitionerCls</name>
-            <value>org.apache.eagle.jpm.util.DefaultJobIdPartitioner</value>
-        </property>
-        <property>
-            <name>controlConfig.timeZone</name>
+            <name>endpointConfig.timeZone</name>
+            <displayName>Time Zone</displayName>
+            <description>which time zone do hdfs data nodes locate in</description>
             <value>Etc/GMT+7</value>
+            <required>true</required>
         </property>
         <property>
             <name>endpointConfig.mrHistoryServerUrl</name>
+            <displayName>Map Reduce History Server</displayName>
+            <description>map reduce history server url address</description>
             <value>http://sandbox.hortonworks.com:19888</value>
+            <required>true</required>
         </property>
         <property>
             <name>endpointConfig.basePath</name>
+            <displayName>Map Reduce History Log File Path</displayName>
+            <description>which directory do map reduce history job files locate in</description>
             <value>/mr-history/done</value>
+            <required>true</required>
         </property>
         <property>
             <name>endpointConfig.hdfs.fs.defaultFS</name>
+            <displayName>HDFS Address</displayName>
+            <description>The name of the default file system.  Either the literal string "local" or a host:port for NDFS</description>
             <value>hdfs://sandbox.hortonworks.com:8020</value>
+            <required>true</required>
         </property>
+
         <property>
             <name>MRConfigureKeys.jobConfigKey</name>
+            <displayName>Map Reduce Extracted Configuration Keys</displayName>
+            <description>which configures will be extracted from map reduce job configurations</description>
             <value>mapreduce.map.output.compress,
                 mapreduce.map.output.compress.codec,
                 mapreduce.output.fileoutputformat.compress,
@@ -112,27 +118,43 @@
         </property>
         <property>
             <name>MRConfigureKeys.jobNameKey</name>
+            <displayName>Map Reduce Job Name Key</displayName>
+            <description>User use -Dkey=value to specify name of a job when submit. use this to extract job name from job configuration</description>
             <value>eagle.job.name</value>
         </property>
         <property>
-            <name>eagleProps.eagleService.host</name>
-            <description>eagleProps.eagleService.host</description>
+            <name>eagleService.host</name>
+            <displayName>Eagle Server Host</displayName>
+            <description>eagle server host</description>
             <value>sandbox.hortonworks.com</value>
+            <required>true</required>
         </property>
         <property>
-            <name>eagleProps.eagleService.port</name>
-            <description>eagleProps.eagleService.port</description>
+            <name>eagleService.port</name>
+            <displayName>Eagle Server Port</displayName>
+            <description>eagle server port</description>
             <value>9099</value>
+            <required>true</required>
         </property>
         <property>
-            <name>eagleProps.eagleService.username</name>
-            <description>eagleProps.eagleService.username</description>
+            <name>eagleService.username</name>
+            <displayName>Eagle Server UserName</displayName>
+            <description>eagle server username used to login</description>
             <value>admin</value>
+            <required>true</required>
         </property>
         <property>
-            <name>eagleProps.eagleService.password</name>
-            <description>eagleProps.eagleService.password</description>
+            <name>eagleService.password</name>
+            <displayName>Eagle Server Password</displayName>
+            <description>eagle server password used to login</description>
             <value>secret</value>
+            <required>true</required>
+        </property>
+        <property>
+            <name>eagleService.readTimeOutSeconds</name>
+            <displayName>Eagle Server Read TimeOut(seconds)</displayName>
+            <description>eagle server read time out(seconds)</description>
+            <value>60</value>
         </property>
     </configuration>
     <docs>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a781937e/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index fa53dfb..3f84f0a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -14,37 +14,26 @@
 # limitations under the License.
 
 {
-  "envContextConfig" : {
-    "parallelismConfig" : {
-      "mrHistoryJobExecutor" : 6
-    },
-    "tasks" : {
-      "mrHistoryJobExecutor" : 6
-    }
-  },
+  "appId":"mrHistoryJob",
+  "mode":"LOCAL",
+  "workers" : 3,
+  "siteId" : "sandbox",
+  application.storm.nimbusHost=localhost
 
-  "jobExtractorConfig" : {
-    "site" : "sandbox",
-    "mrVersion": "MRVer2",
-    "readTimeOutSeconds" : 10
+  "stormConfig" : {
+    "mrHistoryJobSpoutTasks" : 6,
   },
 
   "zkStateConfig" : {
     "zkQuorum" : "sandbox.hortonworks.com:2181",
-    "zkPort" : "2181",
     "zkRoot" : "/test_mrjobhistory",
     "zkSessionTimeoutMs" : 15000,
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000
   },
 
-  "controlConfig" : {
-    "zeroBasedMonth" : false,
-    "partitionerCls" : "org.apache.eagle.jpm.util.DefaultJobIdPartitioner",
-    "timeZone" : "UTC"
-  },
-
   "endpointConfig" : {
+    "timeZone" : "UTC",
     "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888",
     "basePath" : "/mr-history/done",
     "hdfs" : {
@@ -56,19 +45,14 @@
     }
   },
 
-  "eagleProps" : {
-    "eagleService": {
-      "host": "sandbox.hortonworks.com",
-      "port": 9099,
-      "username": "admin",
-      "password": "secret"
-    }
+  "eagleService": {
+    "host": "sandbox.hortonworks.com",
+    "port": 9099,
+    "username": "admin",
+    "password": "secret",
+    "readTimeOutSeconds" : 10,
   },
 
-  "appId":"mrHistoryJob",
-  "mode":"LOCAL",
-  "workers" : 3,
-  application.storm.nimbusHost=localhost
   "MRConfigureKeys" : {
     "jobNameKey" : "eagle.job.name",
     "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts"