You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:33 UTC

[37/52] [abbrv] incubator-eagle git commit: [EAGLE-524] aggregation framework-job level metrics aggregation

[EAGLE-524] aggregation framework-job level metrics aggregation

Author: wujinhu <wu...@126.com>

Closes #419 from wujinhu/aggregation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8774b85c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8774b85c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8774b85c

Branch: refs/heads/master
Commit: 8774b85cd97b7b4c386c1b04cff36c13b3bb82d3
Parents: b66e27b
Author: wujinhu <wu...@126.com>
Authored: Tue Sep 6 17:22:42 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Tue Sep 6 17:22:42 2016 +0800

----------------------------------------------------------------------
 .../jpm/mr/history/MRHistoryJobApplication.java |   2 +-
 .../jpm/mr/history/MRHistoryJobConfig.java      |  24 +---
 .../crawler/DefaultJHFInputStreamCallback.java  |  11 +-
 .../history/crawler/JHFCrawlerDriverImpl.java   |  12 +-
 .../metrics/JobCountMetricsGenerator.java       |  18 +--
 .../metrics/JobCounterMetricsGenerator.java     | 133 +++++++++++++++++++
 .../JobExecutionMetricsCreationListener.java    |   4 +-
 .../mr/history/parser/JHFEventReaderBase.java   |  16 ++-
 .../mr/history/parser/JHFMRVer1EventReader.java |   5 +-
 .../mr/history/parser/JHFMRVer2EventReader.java |   5 +-
 .../jpm/mr/history/parser/JHFParserFactory.java |  22 +--
 ...JobConfigurationCreationServiceListener.java |  16 +--
 .../JobEntityCreationEagleServiceListener.java  |  22 ++-
 .../parser/TaskAttemptCounterListener.java      |  16 +--
 .../mr/history/parser/TaskFailureListener.java  |  16 +--
 .../jpm/mr/history/storm/JobHistorySpout.java   |  37 +++---
 .../org/apache/eagle/jpm/util/Constants.java    |   2 +
 17 files changed, 228 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index 08607a1..beec938 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -67,7 +67,7 @@ public class MRHistoryJobApplication extends StormApplication {
         }
         topologyBuilder.setSpout(
             spoutName,
-            new JobHistorySpout(filter, appConfig),
+            new JobHistorySpout(filter, config),
             parallelism
         ).setNumTasks(tasks);
         return topologyBuilder.createTopology();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
index c0943de..4ac875b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -127,29 +127,19 @@ public class MRHistoryJobConfig implements Serializable {
         this.controlConfig = new ControlConfig();
         this.jobExtractorConfig = new JobExtractorConfig();
         this.eagleServiceConfig = new EagleServiceConfig();
-    }
-
-    public static MRHistoryJobConfig getInstance(String[] args) {
-        manager.init(args);
-        return manager;
+        this.config = null;
     }
 
     public static MRHistoryJobConfig getInstance(Config config) {
-        manager.init(config);
+        if (config != null && manager.config == null) {
+            manager.init(config);
+        }
+
         return manager;
     }
 
-    /**
-     * read configuration file and load hbase config etc.
-     */
-    private void init(String[] args) {
-        // TODO: Probably we can remove the properties file path check in future
-        try {
-            LOG.info("Loading from configuration file");
-            init(new ConfigOptionParser().load(args));
-        } catch (Exception e) {
-            LOG.error("failed to load config");
-        }
+    public static MRHistoryJobConfig get() {
+        return getInstance(null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index 87cd4e0..14b93af 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -33,20 +33,17 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
 
 
     private JobHistoryContentFilter filter;
-    private MRHistoryJobConfig configManager;
 
-    public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, EagleOutputCollector eagleCollector) {
+    public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, EagleOutputCollector eagleCollector) {
         this.filter = filter;
-        this.configManager = configManager;
     }
 
     @Override
     public void onInputStream(InputStream jobFileInputStream, org.apache.hadoop.conf.Configuration conf) throws Exception {
-        final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", jobExtractorConfig.site);
+                put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site);
             }
         };
 
@@ -55,9 +52,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
             jobFileInputStream.close();
         } else {
             //get parser and parse, do not need to emit data now
-            JHFParserBase parser = JHFParserFactory.getParser(configManager,
-                    baseTags,
-                    conf, filter);
+            JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter);
             parser.parse(jobFileInputStream);
             jobFileInputStream.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 2f326fe..55ffc19 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -65,12 +65,10 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
     private TimeZone timeZone;
     private JobCountMetricsGenerator jobCountMetricsGenerator;
 
-    public JHFCrawlerDriverImpl(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig,
-                                MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig,
-                                MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader,
+    public JHFCrawlerDriverImpl(JHFInputStreamCallback reader,
                                 JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception {
-        this.zeroBasedMonth = controlConfig.zeroBasedMonth;
-        this.dryRun = controlConfig.dryRun;
+        this.zeroBasedMonth = MRHistoryJobConfig.get().getControlConfig().zeroBasedMonth;
+        this.dryRun = MRHistoryJobConfig.get().getControlConfig().dryRun;
         if (this.dryRun)  {
             LOG.info("this is a dry run");
         }
@@ -78,8 +76,8 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
         jhfLCM = historyLCM;//new JobHistoryDAOImpl(jobHistoryConfig);
         this.partitionId = partitionId;
         this.jobFilter = jobFilter;
-        timeZone = TimeZone.getTimeZone(controlConfig.timeZone);
-        jobCountMetricsGenerator = new JobCountMetricsGenerator(eagleServiceConfig, jobExtractorConfig, timeZone);
+        timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getControlConfig().timeZone);
+        jobCountMetricsGenerator = new JobCountMetricsGenerator(timeZone);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
index 0e0e5e9..642170d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
@@ -34,15 +34,9 @@ import java.util.*;
 public class JobCountMetricsGenerator {
     private static final Logger LOG = LoggerFactory.getLogger(JobCountMetricsGenerator.class);
 
-    private MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig;
-    private MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig;
     private TimeZone timeZone;
 
-    public JobCountMetricsGenerator(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig,
-                                    MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig,
-                                    TimeZone timeZone) {
-        this.eagleServiceConfig = eagleServiceConfig;
-        this.jobExtractorConfig = jobExtractorConfig;
+    public JobCountMetricsGenerator(TimeZone timeZone) {
         this.timeZone = timeZone;
     }
 
@@ -57,10 +51,10 @@ public class JobCountMetricsGenerator {
         }
 
         IEagleServiceClient client = new EagleServiceClientImpl(
-            eagleServiceConfig.eagleServiceHost,
-            eagleServiceConfig.eagleServicePort,
-            eagleServiceConfig.username,
-            eagleServiceConfig.password);
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
+            MRHistoryJobConfig.get().getEagleServiceConfig().username,
+            MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
 
         GregorianCalendar cal = new GregorianCalendar(year, month, day);
@@ -72,7 +66,7 @@ public class JobCountMetricsGenerator {
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", jobExtractorConfig.site);
+                put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site);
             }
         };
         metricEntity.setTags(baseTags);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
new file mode 100644
index 0000000..6291b37
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCounterMetricsGenerator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.mr.history.metrics;
+
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class JobCounterMetricsGenerator {
+    private static final Logger LOG = LoggerFactory.getLogger(JobCounterMetricsGenerator.class);
+    private static final int BATCH_SIZE = 1000;
+
+    private List<List<GenericMetricEntity>> metricEntities = new ArrayList<>();
+    //metric, time, value
+    private Map<String, Map<Long, Long>> metricValueByMinute = new HashMap<>();
+
+    private List<GenericMetricEntity> lastEntitiesBatch;
+    private Map<String, String> baseTags;
+
+    public JobCounterMetricsGenerator() {
+        this.lastEntitiesBatch = null;
+    }
+
+    public void setBaseTags(Map<String, String> tags) {
+        this.baseTags = tags;
+    }
+
+    public void taskExecutionEntityCreated(TaskExecutionAPIEntity taskExecutionAPIEntity) {
+        JobCounters jobCounters = taskExecutionAPIEntity.getJobCounters();
+        if (jobCounters == null || jobCounters.getCounters() == null) {
+            LOG.warn("found null job counters, task {}", taskExecutionAPIEntity.getTags().get(MRJobTagName.TASK_ID.toString()));
+            return;
+        }
+
+        long duration = taskExecutionAPIEntity.getDuration();
+        long startTime = taskExecutionAPIEntity.getStartTime();
+        long endTime = taskExecutionAPIEntity.getEndTime();
+
+        Map<String, Map<String, Long>> counters = jobCounters.getCounters();
+        for (String groupName : counters.keySet()) {
+            Map<String, Long> metricValues = counters.get(groupName);
+            for (String metric : metricValues.keySet()) {
+                if (!metricValueByMinute.containsKey(metric)) {
+                    metricValueByMinute.put(metric, new HashMap<>());
+                }
+                Long value = metricValues.get(metric);
+                double avg = value * 1.0 / duration;
+                for (long i = startTime; i <= endTime;) {
+                    long timeStamp = i / 60000L * 60000L;
+                    if (!metricValueByMinute.get(metric).containsKey(timeStamp)) {
+                        metricValueByMinute.get(metric).put(timeStamp, 0L);
+                    }
+                    long valueByEachMinute = metricValueByMinute.get(metric).get(timeStamp);
+                    if (endTime >= timeStamp + 60000L) {
+                        metricValueByMinute.get(metric).put(timeStamp, valueByEachMinute + (long)(avg * (timeStamp + 60000L - i)));
+                    } else {
+                        metricValueByMinute.get(metric).put(timeStamp, valueByEachMinute + (long)(avg * (endTime - timeStamp)));
+                    }
+
+                    i = timeStamp + 60000L;
+                }
+            }
+        }
+    }
+
+    private String buildMetricName(String field) {
+        return String.format(Constants.HADOOP_HISTORY_MINUTE_METRIC_FORMAT, Constants.JOB_LEVEL, field);
+    }
+
+    public void flush() throws Exception {
+        for (String metric : metricValueByMinute.keySet()) {
+            Map<Long, Long> valueByMinute = metricValueByMinute.get(metric);
+            for (Long timeStamp : valueByMinute.keySet()) {
+                GenericMetricEntity metricEntity = new GenericMetricEntity();
+                metricEntity.setTimestamp(timeStamp);
+                metricEntity.setPrefix(buildMetricName(metric.toLowerCase()));
+                metricEntity.setValue(new double[] {valueByMinute.get(timeStamp)});
+                metricEntity.setTags(this.baseTags);
+
+                if (this.lastEntitiesBatch == null || this.lastEntitiesBatch.size() > BATCH_SIZE) {
+                    this.lastEntitiesBatch = new ArrayList<>();
+                    metricEntities.add(this.lastEntitiesBatch);
+                }
+
+                this.lastEntitiesBatch.add(metricEntity);
+            }
+        }
+
+        IEagleServiceClient client = new EagleServiceClientImpl(
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
+            MRHistoryJobConfig.get().getEagleServiceConfig().username,
+            MRHistoryJobConfig.get().getEagleServiceConfig().password);
+
+        for (List<GenericMetricEntity> entities : metricEntities) {
+            LOG.info("start flushing entities of total number " + entities.size());
+            client.create(entities);
+            LOG.info("finish flushing entities of total number " + entities.size());
+            entities.clear();
+        }
+        client.getJerseyClient().destroy();
+        client.close();
+        metricEntities.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
index 2129bed..d7e8fcc 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
@@ -57,7 +57,7 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation
                 for (Map<String, Long> metricGroup : jobCounters.getCounters().values()) {
                     for (Map.Entry<String, Long> entry : metricGroup.entrySet()) {
                         String metricName = entry.getKey().toLowerCase();
-                        metrics.add(metricWrapper(timeStamp, "history." + metricName, new double[]{entry.getValue()}, tags));
+                        metrics.add(metricWrapper(timeStamp, metricName, new double[]{entry.getValue()}, tags));
                     }
                 }
             }
@@ -67,7 +67,7 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation
 
     @Override
     public String buildMetricName(String field) {
-        return String.format(Constants.hadoopMetricFormat, Constants.JOB_LEVEL, field);
+        return String.format(Constants.HADOOP_HISTORY_TOTAL_METRIC_FORMAT, Constants.JOB_LEVEL, field);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 1570956..d33c26b 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -18,9 +18,9 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.commons.io.FilenameUtils;
-import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
+import org.apache.eagle.jpm.mr.history.metrics.JobCounterMetricsGenerator;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.jpm.util.Constants;
@@ -69,7 +69,6 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     protected String queueName;
     protected Long jobLaunchTime;
     protected JobHistoryContentFilter filter;
-    private JobHistoryEndpointConfig jobHistoryEndpointConfig;
 
     protected final List<HistoryJobEntityLifecycleListener> jobEntityLifecycleListeners = new ArrayList<>();
 
@@ -78,6 +77,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     private long sumMapTaskDuration;
     private long sumReduceTaskDuration;
 
+    private JobCounterMetricsGenerator jobCounterMetricsGenerator;
+
     public Constants.JobType fetchJobType(Configuration config) {
         if (config.get(Constants.JobConfiguration.CASCADING_JOB) != null) {
             return Constants.JobType.CASCADING;
@@ -101,9 +102,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
      *
      * @param baseTags
      */
-    public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) {
+    public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
         this.filter = filter;
-        this.jobHistoryEndpointConfig = jobHistoryEndpointConfig;
 
         this.baseTags = baseTags;
         jobSubmitEventEntity = new JobEventAPIEntity();
@@ -134,6 +134,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         }
         this.sumMapTaskDuration = 0L;
         this.sumReduceTaskDuration = 0L;
+        this.jobCounterMetricsGenerator = new JobCounterMetricsGenerator();
     }
 
     public void register(HistoryJobEntityLifecycleListener lifecycleListener) {
@@ -148,6 +149,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         }
         try {
             flush();
+            this.jobCounterMetricsGenerator.flush();
         } catch (Exception ex) {
             throw new IOException(ex);
         }
@@ -162,7 +164,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     }
 
     private String buildJobTrackingUrl(String jobId) {
-        String jobTrackingUrlBase = this.jobHistoryEndpointConfig.mrHistoryServerUrl + "/jobhistory/job/";
+        String jobTrackingUrlBase = MRHistoryJobConfig.getInstance(null).getJobHistoryEndpointConfig().mrHistoryServerUrl + "/jobhistory/job/";
         try {
             URI oldUri = new URI(jobTrackingUrlBase);
             URI resolved = oldUri.resolve(jobId);
@@ -303,6 +305,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             } else {
                 jobExecutionEntity.setAvgReduceTaskDuration(this.sumReduceTaskDuration * 1.0 / numTotalReduces);
             }
+            this.jobCounterMetricsGenerator.setBaseTags(jobExecutionEntity.getTags());
             entityCreated(jobExecutionEntity);
         }
     }
@@ -401,6 +404,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             }
 
             entityCreated(entity);
+            this.jobCounterMetricsGenerator.taskExecutionEntityCreated(entity);
             //_taskStartTime.remove(taskID); // clean this taskID
         } else if ((recType == RecordTypes.MapAttempt || recType == RecordTypes.ReduceAttempt) && startTime != null) { // task attempt start
             taskAttemptStartTime.put(taskAttemptID, Long.valueOf(startTime));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
index 0e9458a..e20836f 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
@@ -49,8 +48,8 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
      *
      * @param baseTags
      */
-    public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) {
-        super(baseTags, configuration, filter, jobHistoryEndpointConfig);
+    public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+        super(baseTags, configuration, filter);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 74f84f6..0919aa0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
@@ -44,8 +43,8 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
      *
      * @throws IOException
      */
-    public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) {
-        super(baseTags, configuration, filter, jobHistoryEndpointConfig);
+    public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+        super(baseTags, configuration, filter);
     }
 
     @SuppressWarnings("deprecation")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index 386d50c..56fd956 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -30,8 +30,8 @@ public class JHFParserFactory {
 
     private static final Logger LOG = LoggerFactory.getLogger(JHFParserFactory.class);
 
-    public static JHFParserBase getParser(MRHistoryJobConfig configManager, Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
-        String format = configManager.getJobExtractorConfig().mrVersion;
+    public static JHFParserBase getParser(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+        String format = MRHistoryJobConfig.get().getJobExtractorConfig().mrVersion;
         JHFParserBase parser;
         JHFFormat f;
         try {
@@ -46,21 +46,21 @@ public class JHFParserFactory {
 
         switch (f) {
             case MRVer2:
-                JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig());
-                reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
-                reader2.addListener(new TaskFailureListener(configManager));
-                reader2.addListener(new TaskAttemptCounterListener(configManager));
-                reader2.addListener(new JobConfigurationCreationServiceListener(configManager));
+                JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
+                reader2.addListener(new JobEntityCreationEagleServiceListener());
+                reader2.addListener(new TaskFailureListener());
+                reader2.addListener(new TaskAttemptCounterListener());
+                reader2.addListener(new JobConfigurationCreationServiceListener());
 
                 reader2.register(new JobEntityLifecycleAggregator());
                 parser = new JHFMRVer2Parser(reader2);
                 break;
             case MRVer1:
             default:
-                JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig());
-                reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
-                reader1.addListener(new TaskFailureListener(configManager));
-                reader1.addListener(new TaskAttemptCounterListener(configManager));
+                JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
+                reader1.addListener(new JobEntityCreationEagleServiceListener());
+                reader1.addListener(new TaskFailureListener());
+                reader1.addListener(new TaskAttemptCounterListener());
 
                 reader1.register(new JobEntityLifecycleAggregator());
                 parser = new JHFMRVer1Parser(reader1);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 7293c89..bf93432 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -32,11 +32,9 @@ import java.util.List;
 public class JobConfigurationCreationServiceListener implements HistoryJobEntityLifecycleListener {
     private static final Logger logger = LoggerFactory.getLogger(JobConfigurationCreationServiceListener.class);
     private static final int MAX_RETRY_TIMES = 3;
-    private MRHistoryJobConfig configManager;
     private JobConfigurationAPIEntity jobConfigurationEntity;
 
-    public JobConfigurationCreationServiceListener(MRHistoryJobConfig configManager) {
-        this.configManager = configManager;
+    public JobConfigurationCreationServiceListener() {
     }
 
     @Override
@@ -55,15 +53,13 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
 
     @Override
     public void flush() throws Exception {
-        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-            eagleServiceConfig.eagleServiceHost,
-            eagleServiceConfig.eagleServicePort,
-            eagleServiceConfig.username,
-            eagleServiceConfig.password);
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
+            MRHistoryJobConfig.get().getEagleServiceConfig().username,
+            MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
         List<JobConfigurationAPIEntity> list = new ArrayList<>();
         list.add(jobConfigurationEntity);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 623a776..74368a5 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -40,7 +40,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
     private static final int BATCH_SIZE = 1000;
     private int batchSize;
     private List<JobBaseAPIEntity> list = new ArrayList<>();
-    private MRHistoryJobConfig configManager;
     List<JobExecutionAPIEntity> jobs = new ArrayList<>();
     List<JobEventAPIEntity> jobEvents = new ArrayList<>();
     List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
@@ -48,17 +47,16 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
     private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener();
     private TimeZone timeZone;
 
-    public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) {
-        this(configManager, BATCH_SIZE);
+    public JobEntityCreationEagleServiceListener() {
+        this(BATCH_SIZE);
     }
 
-    public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager, int batchSize) {
-        this.configManager = configManager;
+    public JobEntityCreationEagleServiceListener(int batchSize) {
         if (batchSize <= 0) {
             throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
         }
         this.batchSize = batchSize;
-        timeZone = TimeZone.getTimeZone(configManager.getControlConfig().timeZone);
+        timeZone = TimeZone.getTimeZone(MRHistoryJobConfig.get().getControlConfig().timeZone);
     }
 
     @Override
@@ -84,15 +82,13 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
      */
     @Override
     public void flush() throws Exception {
-        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-            eagleServiceConfig.eagleServiceHost,
-            eagleServiceConfig.eagleServicePort,
-            eagleServiceConfig.username,
-            eagleServiceConfig.password);
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
+            MRHistoryJobConfig.get().getEagleServiceConfig().username,
+            MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
         logger.info("start flushing entities of total number " + list.size());
         List<GenericMetricEntity> metricEntities = new ArrayList<>();
         for (int i = 0; i < list.size(); i++) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index efc43c5..ef7c8e9 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -34,10 +34,8 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
     private static final Logger logger = LoggerFactory.getLogger(TaskAttemptCounterListener.class);
     private static final int BATCH_SIZE = 1000;
     private Map<CounterKey, CounterValue> counters = new HashMap<>();
-    private MRHistoryJobConfig configManager;
 
-    public TaskAttemptCounterListener(MRHistoryJobConfig configManager) {
-        this.configManager = configManager;
+    public TaskAttemptCounterListener() {
     }
 
     private static class CounterKey {
@@ -112,15 +110,13 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
 
     @Override
     public void flush() throws Exception {
-        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-            eagleServiceConfig.eagleServiceHost,
-            eagleServiceConfig.eagleServicePort,
-            eagleServiceConfig.username,
-            eagleServiceConfig.password);
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
+            MRHistoryJobConfig.get().getEagleServiceConfig().username,
+            MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
         List<TaskAttemptCounterAPIEntity> list = new ArrayList<>();
         logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size());
         // create entity

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index f95eaa2..1a7a5fc 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -44,10 +44,8 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
 
     private final List<TaskFailureCountAPIEntity> failureTasks = new ArrayList<TaskFailureCountAPIEntity>();
     private final MRErrorClassifier classifier;
-    private MRHistoryJobConfig configManager;
 
-    public TaskFailureListener(MRHistoryJobConfig configManager) {
-        this.configManager = configManager;
+    public TaskFailureListener() {
         InputStream is = null;
         try {
             is = TaskFailureListener.class.getClassLoader().getResourceAsStream(MR_ERROR_CATEGORY_CONFIG_FILE_NAME);
@@ -109,15 +107,13 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
 
     @Override
     public void flush() throws Exception {
-        MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         IEagleServiceClient client = new EagleServiceClientImpl(
-            eagleServiceConfig.eagleServiceHost,
-            eagleServiceConfig.eagleServicePort,
-            eagleServiceConfig.username,
-            eagleServiceConfig.password);
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
+            MRHistoryJobConfig.get().getEagleServiceConfig().username,
+            MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
 
         int tried = 0;
         while (tried <= MAX_RETRY_TIMES) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 04283d3..da98e0d 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.storm;
 
+import com.typesafe.config.Config;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.*;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
@@ -91,22 +92,22 @@ public class JobHistorySpout extends BaseRichSpout {
     private JobHistoryContentFilter contentFilter;
     private JobHistorySpoutCollectorInterceptor interceptor;
     private JHFInputStreamCallback callback;
-    private MRHistoryJobConfig configManager;
     private JobHistoryLCM jhfLCM;
     private static final int MAX_RETRY_TIMES = 3;
+    private Config config;
 
-    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager) {
-        this(filter, configManager, new JobHistorySpoutCollectorInterceptor());
+    public JobHistorySpout(JobHistoryContentFilter filter, Config config) {
+        this(filter, config, new JobHistorySpoutCollectorInterceptor());
     }
 
     /**
      * mostly this constructor signature is for unit test purpose as you can put customized interceptor here.
      */
-    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig configManager, JobHistorySpoutCollectorInterceptor adaptor) {
+    public JobHistorySpout(JobHistoryContentFilter filter, Config config, JobHistorySpoutCollectorInterceptor adaptor) {
         this.contentFilter = filter;
-        this.configManager = configManager;
+        this.config = config;
         this.interceptor = adaptor;
-        callback = new DefaultJHFInputStreamCallback(contentFilter, configManager, interceptor);
+        callback = new DefaultJHFInputStreamCallback(contentFilter, interceptor);
     }
 
     private int calculatePartitionId(TopologyContext context) {
@@ -127,13 +128,14 @@ public class JobHistorySpout extends BaseRichSpout {
     @Override
     public void open(Map conf, TopologyContext context,
                      final SpoutOutputCollector collector) {
+        MRHistoryJobConfig.getInstance(config);
         partitionId = calculatePartitionId(context);
         // sanity verify 0<=partitionId<=numTotalPartitions-1
         if (partitionId < 0 || partitionId > numTotalPartitions) {
             throw new IllegalStateException("partitionId should be less than numTotalPartitions with partitionId "
                 + partitionId + " and numTotalPartitions " + numTotalPartitions);
         }
-        Class<? extends JobIdPartitioner> partitionerCls = configManager.getControlConfig().partitionerCls;
+        Class<? extends JobIdPartitioner> partitionerCls = MRHistoryJobConfig.get().getControlConfig().partitionerCls;
         JobIdPartitioner partitioner;
         try {
             partitioner = partitionerCls.newInstance();
@@ -142,16 +144,13 @@ public class JobHistorySpout extends BaseRichSpout {
             throw new IllegalStateException(e);
         }
         JobIdFilter jobIdFilter = new JobIdFilterByPartition(partitioner, numTotalPartitions, partitionId);
-        JobHistoryZKStateManager.instance().init(configManager.getZkStateConfig());
+        JobHistoryZKStateManager.instance().init(MRHistoryJobConfig.get().getZkStateConfig());
         JobHistoryZKStateManager.instance().ensureJobPartitions(numTotalPartitions);
         interceptor.setSpoutOutputCollector(collector);
 
         try {
-            jhfLCM = new JobHistoryDAOImpl(configManager.getJobHistoryEndpointConfig());
+            jhfLCM = new JobHistoryDAOImpl(MRHistoryJobConfig.get().getJobHistoryEndpointConfig());
             driver = new JHFCrawlerDriverImpl(
-                configManager.getEagleServiceConfig(),
-                configManager.getJobExtractorConfig(),
-                configManager.getControlConfig(),
                 callback,
                 jhfLCM,
                 jobIdFilter,
@@ -232,11 +231,9 @@ public class JobHistorySpout extends BaseRichSpout {
         }
 
         LOG.info("update process time stamp {}", minTimeStamp);
-        final MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = configManager.getEagleServiceConfig();
-        final MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig = configManager.getJobExtractorConfig();
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", jobExtractorConfig.site);
+                put("site", MRHistoryJobConfig.get().getJobExtractorConfig().site);
             }
         };
         JobProcessTimeStampEntity entity = new JobProcessTimeStampEntity();
@@ -245,12 +242,12 @@ public class JobHistorySpout extends BaseRichSpout {
         entity.setTags(baseTags);
 
         IEagleServiceClient client = new EagleServiceClientImpl(
-            eagleServiceConfig.eagleServiceHost,
-            eagleServiceConfig.eagleServicePort,
-            eagleServiceConfig.username,
-            eagleServiceConfig.password);
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServiceHost,
+            MRHistoryJobConfig.get().getEagleServiceConfig().eagleServicePort,
+            MRHistoryJobConfig.get().getEagleServiceConfig().username,
+            MRHistoryJobConfig.get().getEagleServiceConfig().password);
 
-        client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        client.getJerseyClient().setReadTimeout(MRHistoryJobConfig.get().getJobExtractorConfig().readTimeoutSeconds * 1000);
 
         List<JobProcessTimeStampEntity> entities = new ArrayList<>();
         entities.add(entity);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8774b85c/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index e18fe07..5a60ee3 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -179,4 +179,6 @@ public class Constants {
     public static final String TASK_LEVEL = "task";
     public static final String JOB_COUNT_PER_DAY = "hadoop.job.day.count";
 
+    public static final String HADOOP_HISTORY_TOTAL_METRIC_FORMAT = "hadoop.%s.history.%s";
+    public static final String HADOOP_HISTORY_MINUTE_METRIC_FORMAT = "hadoop.%s.history.minute.%s";
 }