You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/09/02 03:49:02 UTC

incubator-eagle git commit: [EAGLE-518] add Job counter metrics for mr history job

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 9488afc15 -> b2b16b745


[EAGLE-518] add Job counter metrics for mr history job

Author: wujinhu <wu...@126.com>

Closes #414 from wujinhu/EAGLE-518.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b2b16b74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b2b16b74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b2b16b74

Branch: refs/heads/develop
Commit: b2b16b745e210eb80929d970820c322d557d93e0
Parents: 9488afc
Author: wujinhu <wu...@126.com>
Authored: Fri Sep 2 11:48:54 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Fri Sep 2 11:48:54 2016 +0800

----------------------------------------------------------------------
 .../history/crawler/JHFCrawlerDriverImpl.java   | 53 +++---------
 .../metrics/JobCountMetricsGenerator.java       | 88 ++++++++++++++++++++
 .../JobExecutionMetricsCreationListener.java    | 75 +++++++++++++++++
 .../JobEntityCreationEagleServiceListener.java  | 13 +++
 .../AbstractMetricsCreationListener.java        | 42 ----------
 .../JobExecutionMetricsCreationListener.java    |  9 +-
 .../TaskExecutionMetricsCreationListener.java   |  3 +-
 .../org/apache/eagle/jpm/util/Constants.java    |  4 +
 .../AbstractMetricsCreationListener.java        | 42 ++++++++++
 9 files changed, 239 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 077f4e1..2f326fe 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -19,6 +19,7 @@
 package org.apache.eagle.jpm.mr.history.crawler;
 
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
+import org.apache.eagle.jpm.mr.history.metrics.JobCountMetricsGenerator;
 import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.mr.historyentity.JobCountEntity;
@@ -62,15 +63,12 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
     private JobIdFilter jobFilter;
     private int partitionId;
     private TimeZone timeZone;
-    private MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig;
-    private MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig;
+    private JobCountMetricsGenerator jobCountMetricsGenerator;
 
     public JHFCrawlerDriverImpl(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig,
                                 MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig,
                                 MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader,
                                 JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception {
-        this.eagleServiceConfig = eagleServiceConfig;
-        this.jobExtractorConfig = jobExtractorConfig;
         this.zeroBasedMonth = controlConfig.zeroBasedMonth;
         this.dryRun = controlConfig.dryRun;
         if (this.dryRun)  {
@@ -81,6 +79,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
         this.partitionId = partitionId;
         this.jobFilter = jobFilter;
         timeZone = TimeZone.getTimeZone(controlConfig.timeZone);
+        jobCountMetricsGenerator = new JobCountMetricsGenerator(eagleServiceConfig, jobExtractorConfig, timeZone);
     }
 
     /**
@@ -191,7 +190,13 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
                 jobHistoryFile);
         processedJobFileNames.add(jobHistoryFile);
 
-        flushJobCount();
+        jobCountMetricsGenerator.flush(
+            String.format(FORMAT_JOB_PROCESS_DATE,
+                this.processDate.year,
+                this.processDate.month + 1,
+                this.processDate.day),
+            this.processDate.year, this.processDate.month, this.processDate.day
+        );
         Long modifiedTime = item.getLeft();
         return modifiedTime;
     }
@@ -237,44 +242,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
         }
     }
 
-    private void flushJobCount() throws Exception {
-        List<Pair<String, String>> jobs = JobHistoryZKStateManager.instance().getProcessedJobs(
-            String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year, this.processDate.month + 1, this.processDate.day)
-        );
-        JobCountEntity entity = new JobCountEntity();
-        entity.setTotal(jobs.size());
-        entity.setFail(0);
-        jobs.stream().filter(job -> !job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())).forEach(
-            job -> entity.setFail(1 + entity.getFail())
-        );
-
-        IEagleServiceClient client = new EagleServiceClientImpl(
-            eagleServiceConfig.eagleServiceHost,
-            eagleServiceConfig.eagleServicePort,
-            eagleServiceConfig.username,
-            eagleServiceConfig.password);
-
-
-        GregorianCalendar cal = new GregorianCalendar(this.processDate.year, this.processDate.month, this.processDate.day, 0, 0, 0);
-        cal.setTimeZone(timeZone);
-        entity.setTimestamp(cal.getTimeInMillis());
-        @SuppressWarnings("serial")
-        Map<String, String> baseTags = new HashMap<String, String>() {
-            {
-                put("site", jobExtractorConfig.site);
-            }
-        };
-        entity.setTags(baseTags);
-        List<JobCountEntity> entities = new ArrayList<>();
-        entities.add(entity);
-
-        LOG.info("start flushing entities of total number " + entities.size());
-        client.create(entities);
-        LOG.info("finish flushing entities of total number " + entities.size());
-        client.getJerseyClient().destroy();
-        client.close();
-    }
-
     private void advanceOneDay() throws Exception {
         //flushJobCount();
         GregorianCalendar cal = new GregorianCalendar(timeZone);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
new file mode 100644
index 0000000..0e0e5e9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.mr.history.metrics;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
+import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus;
+import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class JobCountMetricsGenerator {
+    private static final Logger LOG = LoggerFactory.getLogger(JobCountMetricsGenerator.class);
+
+    private MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig;
+    private MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig;
+    private TimeZone timeZone;
+
+    public JobCountMetricsGenerator(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig,
+                                    MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig,
+                                    TimeZone timeZone) {
+        this.eagleServiceConfig = eagleServiceConfig;
+        this.jobExtractorConfig = jobExtractorConfig;
+        this.timeZone = timeZone;
+    }
+
+    public void flush(String date, int year, int month, int day) throws Exception {
+        List<Pair<String, String>> jobs = JobHistoryZKStateManager.instance().getProcessedJobs(date);
+        int total = jobs.size();
+        int fail = 0;
+        for (Pair<String, String> job : jobs) {
+            if (!job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())) {
+                ++fail;
+            }
+        }
+
+        IEagleServiceClient client = new EagleServiceClientImpl(
+            eagleServiceConfig.eagleServiceHost,
+            eagleServiceConfig.eagleServicePort,
+            eagleServiceConfig.username,
+            eagleServiceConfig.password);
+
+
+        GregorianCalendar cal = new GregorianCalendar(year, month, day);
+        cal.setTimeZone(timeZone);
+        GenericMetricEntity metricEntity = new GenericMetricEntity();
+        metricEntity.setTimestamp(cal.getTimeInMillis());
+        metricEntity.setPrefix(Constants.JOB_COUNT_PER_DAY);
+        metricEntity.setValue(new double[]{total, fail});
+        @SuppressWarnings("serial")
+        Map<String, String> baseTags = new HashMap<String, String>() {
+            {
+                put("site", jobExtractorConfig.site);
+            }
+        };
+        metricEntity.setTags(baseTags);
+        List<GenericMetricEntity> entities = new ArrayList<>();
+        entities.add(metricEntity);
+
+        LOG.info("start flushing entities of total number " + entities.size());
+        client.create(entities);
+        LOG.info("finish flushing entities of total number " + entities.size());
+        client.getJerseyClient().destroy();
+        client.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
new file mode 100644
index 0000000..2129bed
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.mr.history.metrics;
+
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class JobExecutionMetricsCreationListener extends AbstractMetricsCreationListener<JobExecutionAPIEntity> {
+
+    @Override
+    public List<GenericMetricEntity> generateMetrics(JobExecutionAPIEntity entity) {
+        List<GenericMetricEntity> metrics = new ArrayList<>();
+        if (entity != null) {
+            Long timeStamp = entity.getTimestamp();
+            Map<String, String> tags = entity.getTags();
+            metrics.add(metricWrapper(timeStamp,
+                Constants.JOB_EXECUTION_TIME,
+                new double[]{entity.getDurationTime()},
+                tags));
+
+            metrics.add(metricWrapper(
+                timeStamp,
+                Constants.MAP_COUNT_RATIO,
+                new double[]{entity.getNumTotalMaps(), 1.0 * entity.getNumFailedMaps() / entity.getNumTotalMaps()},
+                tags));
+
+            metrics.add(metricWrapper(
+                timeStamp,
+                Constants.REDUCE_COUNT_RATIO,
+                new double[]{entity.getNumTotalReduces(), 1.0 * entity.getNumFailedReduces() / entity.getNumTotalReduces()},
+                tags));
+
+            org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounters = entity.getJobCounters();
+            if (jobCounters != null && jobCounters.getCounters() != null) {
+                for (Map<String, Long> metricGroup : jobCounters.getCounters().values()) {
+                    for (Map.Entry<String, Long> entry : metricGroup.entrySet()) {
+                        String metricName = entry.getKey().toLowerCase();
+                        metrics.add(metricWrapper(timeStamp, "history." + metricName, new double[]{entry.getValue()}, tags));
+                    }
+                }
+            }
+        }
+        return metrics;
+    }
+
+    @Override
+    public String buildMetricName(String field) {
+        return String.format(Constants.hadoopMetricFormat, Constants.JOB_LEVEL, field);
+    }
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 30eeb54..623a776 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -19,9 +19,11 @@
 package org.apache.eagle.jpm.mr.history.parser;
 
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
+import org.apache.eagle.jpm.mr.history.metrics.JobExecutionMetricsCreationListener;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.mr.historyentity.*;
 import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.log.entity.GenericMetricEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -43,6 +45,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
     List<JobEventAPIEntity> jobEvents = new ArrayList<>();
     List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
     List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
+    private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener();
     private TimeZone timeZone;
 
     public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) {
@@ -91,6 +94,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
         logger.info("start flushing entities of total number " + list.size());
+        List<GenericMetricEntity> metricEntities = new ArrayList<>();
         for (int i = 0; i < list.size(); i++) {
             JobBaseAPIEntity entity = list.get(i);
             if (entity instanceof JobExecutionAPIEntity) {
@@ -98,6 +102,8 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
                 JobHistoryZKStateManager.instance().updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
                     entity.getTags().get(MRJobTagName.JOB_ID.toString()),
                     ((JobExecutionAPIEntity) entity).getCurrentState());
+
+                metricEntities.addAll(jobExecutionMetricsCreationListener.generateMetrics((JobExecutionAPIEntity)entity));
             } else if (entity instanceof JobEventAPIEntity) {
                 jobEvents.add((JobEventAPIEntity) entity);
             } else if (entity instanceof TaskExecutionAPIEntity) {
@@ -113,6 +119,12 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             checkResult(result);
             jobs.clear();
         }
+        if (metricEntities.size() > 0) {
+            logger.info("flush job metrics of number " + metricEntities.size());
+            result = client.create(metricEntities);
+            checkResult(result);
+            metricEntities.clear();
+        }
         if (jobEvents.size() > 0) {
             logger.info("flush JobEventAPIEntity of number " + jobEvents.size());
             result = client.create(jobEvents);
@@ -131,6 +143,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             checkResult(result);
             taskAttemptExecs.clear();
         }
+
         logger.info("finish flushing entities of total number " + list.size());
         list.clear();
         client.getJerseyClient().destroy();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java
deleted file mode 100644
index 8634b6a..0000000
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.mr.running.parser.metrics;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericMetricEntity;
-
-import java.util.List;
-import java.util.Map;
-
-public abstract class AbstractMetricsCreationListener<E extends TaggedLogAPIEntity> {
-
-    public abstract List<GenericMetricEntity> generateMetrics(E entity);
-
-    protected abstract String buildMetricName(String field);
-
-    protected GenericMetricEntity metricWrapper(Long timestamp, String field, double value, Map<String, String> tags) {
-        String metricName = buildMetricName(field);
-        GenericMetricEntity metricEntity = new GenericMetricEntity();
-        metricEntity.setTimestamp(timestamp);
-        metricEntity.setTags(tags);
-        metricEntity.setPrefix(metricName);
-        metricEntity.setValue(new double[]{value});
-        return metricEntity;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
index 76d2a19..8b30d45 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.running.parser.metrics;
 import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -34,15 +35,15 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation
         if (entity != null) {
             Long currentTime = System.currentTimeMillis();
             Map<String, String> tags = entity.getTags();
-            metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_MB, entity.getAllocatedMB(), tags));
-            metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_VCORES, entity.getAllocatedVCores(), tags));
-            metrics.add(metricWrapper(currentTime, Constants.RUNNING_CONTAINERS, entity.getRunningContainers(), tags));
+            metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_MB, new double[]{entity.getAllocatedMB()}, tags));
+            metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_VCORES, new double[]{entity.getAllocatedVCores()}, tags));
+            metrics.add(metricWrapper(currentTime, Constants.RUNNING_CONTAINERS, new double[]{entity.getRunningContainers()}, tags));
             org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounters = entity.getJobCounters();
             if (jobCounters != null && jobCounters.getCounters() != null) {
                 for (Map<String, Long> metricGroup : jobCounters.getCounters().values()) {
                     for (Map.Entry<String, Long> entry : metricGroup.entrySet()) {
                         String metricName = entry.getKey().toLowerCase();
-                        metrics.add(metricWrapper(currentTime, metricName, entry.getValue(), tags));
+                        metrics.add(metricWrapper(currentTime, metricName, new double[]{entry.getValue()}, tags));
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
index d0b0d57..9f22a7f 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.running.parser.metrics;
 import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -33,7 +34,7 @@ public class TaskExecutionMetricsCreationListener extends AbstractMetricsCreatio
         if (entity != null) {
             Long currentTime = System.currentTimeMillis();
             Map<String, String> tags = entity.getTags();
-            metrics.add(metricWrapper(currentTime, Constants.TASK_EXECUTION_TIME, entity.getDuration(), tags));
+            metrics.add(metricWrapper(currentTime, Constants.TASK_EXECUTION_TIME, new double[]{entity.getDuration()}, tags));
         }
         return metrics;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index ec56eac..e18fe07 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -172,7 +172,11 @@ public class Constants {
     public static final String ALLOCATED_VCORES = "allocatedvcores";
     public static final String RUNNING_CONTAINERS = "runningcontainers";
     public static final String TASK_EXECUTION_TIME = "taskduration";
+    public static final String JOB_EXECUTION_TIME = "jobduration";
+    public static final String MAP_COUNT_RATIO = "map.count.ratio";
+    public static final String REDUCE_COUNT_RATIO = "reduce.count.ratio";
     public static final String JOB_LEVEL = "job";
     public static final String TASK_LEVEL = "task";
+    public static final String JOB_COUNT_PER_DAY = "hadoop.job.day.count";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java
new file mode 100644
index 0000000..dd61432
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.metrics;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractMetricsCreationListener<E extends TaggedLogAPIEntity> {
+
+    public abstract List<GenericMetricEntity> generateMetrics(E entity);
+
+    protected abstract String buildMetricName(String field);
+
+    protected GenericMetricEntity metricWrapper(Long timestamp, String field, double[] values, Map<String, String> tags) {
+        String metricName = buildMetricName(field);
+        GenericMetricEntity metricEntity = new GenericMetricEntity();
+        metricEntity.setTimestamp(timestamp);
+        metricEntity.setTags(tags);
+        metricEntity.setPrefix(metricName);
+        metricEntity.setValue(values);
+        return metricEntity;
+    }
+}