You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/31 06:21:34 UTC

incubator-eagle git commit: [EAGLE-514] Add two job count apis

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 4aa5b4526 -> a66f64cf9


[EAGLE-514] Add two job count apis

https://issues.apache.org/jira/browse/EAGLE-514

1. adding two job counting apis

2. add tracking url in running/history job execution entity

3. unify the status presentation for job/task execution entity

4. unify the name of the common fields between running job entity & history job entity

Author: Qingwen Zhao <qi...@gmail.com>

Closes #408 from qingwen220/jobStats.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a66f64cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a66f64cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a66f64cf

Branch: refs/heads/develop
Commit: a66f64cf9f4212f4923f0e6ea6c7270449aa2ce4
Parents: 4aa5b45
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Wed Aug 31 14:21:22 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Wed Aug 31 14:21:22 2016 +0800

----------------------------------------------------------------------
 .../mr/historyentity/JobExecutionAPIEntity.java |  11 +
 .../mr/runningentity/JobExecutionAPIEntity.java |  36 ++-
 .../jpm/mr/history/MRHistoryJobConfig.java      |   2 +
 .../history/crawler/JHFCrawlerDriverImpl.java   |   2 +-
 .../jpm/mr/history/parser/EagleJobStatus.java   |   2 +-
 .../mr/history/parser/JHFEventReaderBase.java   |  21 +-
 .../mr/history/parser/JHFMRVer1EventReader.java |   5 +-
 .../mr/history/parser/JHFMRVer2EventReader.java |  15 +-
 .../jpm/mr/history/parser/JHFParserFactory.java |   4 +-
 .../JobEntityCreationEagleServiceListener.java  |   6 +-
 .../src/main/resources/application.conf         |   1 +
 .../jpm/mr/running/parser/MRJobParser.java      |   5 +-
 .../eagle/service/jpm/MRJobCountHelper.java     | 121 ++++++++
 .../service/jpm/MRJobExecutionResource.java     | 286 +++++++++++++------
 .../service/jpm/MRJobTaskCountResponse.java     |  65 +++++
 .../service/jpm/MRJobTaskGroupResponse.java     |  41 ---
 .../service/jpm/TaskCountByDurationHelper.java  | 106 +++++++
 .../jpm/TestJobCountPerBucketHelper.java        |  87 ++++++
 .../service/jpm/TestMRJobExecutionResource.java |  99 -------
 .../service/jpm/TestTaskCountPerJobHelper.java  |  96 +++++++
 .../org/apache/eagle/jpm/util/Constants.java    |   2 +
 21 files changed, 752 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
index 97e77b2..cdc5810 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
@@ -89,6 +89,17 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
     private int totalReduceAttempts;
     @Column("ac")
     private int failedReduceAttempts;
+    @Column("ad")
+    private String trackingUrl;
+
+    public String getTrackingUrl() {
+        return trackingUrl;
+    }
+
+    public void setTrackingUrl(String trackingUrl) {
+        this.trackingUrl = trackingUrl;
+        valueChanged("trackingUrl");
+    }
 
     public long getDurationTime() {
         return durationTime;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
index dd81eb4..245fc0f 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
@@ -49,11 +49,11 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
     @Column("e")
     private int numTotalMaps;
     @Column("f")
-    private int mapsCompleted;
+    private int numFinishedMaps;
     @Column("g")
     private int numTotalReduces;
     @Column("h")
-    private int reducesCompleted;
+    private int numFinishedReduces;
     @Column("i")
     private double mapProgress;
     @Column("j")
@@ -112,6 +112,18 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
     private long submissionTime;
     @Column("ak")
     private String internalState;
+    @Column("al")
+    private String trackingUrl;
+
+    public String getTrackingUrl() {
+        return trackingUrl;
+    }
+
+    public void setTrackingUrl(String trackingUrl) {
+        this.trackingUrl = trackingUrl;
+        valueChanged("trackingUrl");
+    }
+
 
     public JobConfig getJobConfig() {
         return jobConfig;
@@ -176,13 +188,13 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
         valueChanged("numTotalMaps");
     }
 
-    public int getMapsCompleted() {
-        return mapsCompleted;
+    public int getNumFinishedMaps() {
+        return numFinishedMaps;
     }
 
-    public void setMapsCompleted(int mapsCompleted) {
-        this.mapsCompleted = mapsCompleted;
-        valueChanged("mapsCompleted");
+    public void setNumFinishedMaps(int numFinishedMaps) {
+        this.numFinishedMaps = numFinishedMaps;
+        valueChanged("numFinishedMaps");
     }
 
     public int getNumTotalReduces() {
@@ -194,13 +206,13 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
         valueChanged("numTotalReduces");
     }
 
-    public int getReducesCompleted() {
-        return reducesCompleted;
+    public int getNumFinishedReduces() {
+        return numFinishedReduces;
     }
 
-    public void setReducesCompleted(int reducesCompleted) {
-        this.reducesCompleted = reducesCompleted;
-        valueChanged("reducesCompleted");
+    public void setNumFinishedReduces(int numFinishedReduces) {
+        this.numFinishedReduces = numFinishedReduces;
+        valueChanged("numFinishedReduces");
     }
 
     public double getMapProgress() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
index ae86904..c0943de 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -86,6 +86,7 @@ public class MRHistoryJobConfig implements Serializable {
 
     public static class JobHistoryEndpointConfig implements Serializable {
         public String nnEndpoint;
+        public String mrHistoryServerUrl;
         public String basePath;
         public boolean pathContainsJobTrackerName;
         public String jobTrackerName;
@@ -173,6 +174,7 @@ public class MRHistoryJobConfig implements Serializable {
         this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath");
         this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName");
         this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
+        this.jobHistoryEndpointConfig.mrHistoryServerUrl = config.getString("dataSourceConfig.mrHistoryServerUrl");
         this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName");
         this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal");
         this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index e16ecce..1a17751 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -247,7 +247,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
         JobCountEntity entity = new JobCountEntity();
         entity.setTotal(jobs.size());
         entity.setFail(0);
-        jobs.stream().filter(job -> !job.getRight().equals(EagleJobStatus.SUCCESS.toString())).forEach(
+        jobs.stream().filter(job -> !job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())).forEach(
             job -> entity.setFail(1 + entity.getFail())
         );
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
index fb218e3..24fa097 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
@@ -23,7 +23,7 @@ public enum EagleJobStatus {
     LAUNCHED,
     PREP,
     RUNNING,
-    SUCCESS,
+    SUCCEEDED,
     KILLED,
     FAILED;
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 6916aad..1570956 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -18,6 +18,8 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+import org.apache.commons.io.FilenameUtils;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.mr.historyentity.*;
@@ -32,6 +34,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -65,6 +69,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     protected String queueName;
     protected Long jobLaunchTime;
     protected JobHistoryContentFilter filter;
+    private JobHistoryEndpointConfig jobHistoryEndpointConfig;
 
     protected final List<HistoryJobEntityLifecycleListener> jobEntityLifecycleListeners = new ArrayList<>();
 
@@ -96,8 +101,9 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
      *
      * @param baseTags
      */
-    public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+    public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) {
         this.filter = filter;
+        this.jobHistoryEndpointConfig = jobHistoryEndpointConfig;
 
         this.baseTags = baseTags;
         jobSubmitEventEntity = new JobEventAPIEntity();
@@ -155,6 +161,18 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         }
     }
 
+    private String buildJobTrackingUrl(String jobId) {
+        String jobTrackingUrlBase = this.jobHistoryEndpointConfig.mrHistoryServerUrl + "/jobhistory/job/";
+        try {
+            URI oldUri = new URI(jobTrackingUrlBase);
+            URI resolved = oldUri.resolve(jobId);
+            return resolved.toString();
+        } catch (URISyntaxException e) {
+            LOG.warn("Tracking url build failed with baseURL=%s, resolvePart=%s", jobTrackingUrlBase, jobId);
+            return jobTrackingUrlBase;
+        }
+    }
+
     /**
      * ...
      * @param id
@@ -236,6 +254,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), queueName);
             jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.jobType);
 
+            jobExecutionEntity.setTrackingUrl(buildJobTrackingUrl(jobId));
             jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
             jobExecutionEntity.setStartTime(jobLaunchEventEntity.getTimestamp());
             jobExecutionEntity.setEndTime(jobFinishEventEntity.getTimestamp());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
index e20836f..0e9458a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
@@ -48,8 +49,8 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
      *
      * @param baseTags
      */
-    public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
-        super(baseTags, configuration, filter);
+    public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) {
+        super(baseTags, configuration, filter, jobHistoryEndpointConfig);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index f21fd41..74f84f6 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
 import org.apache.eagle.jpm.util.jobcounter.JobCounters;
@@ -43,8 +44,8 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
      *
      * @throws IOException
      */
-    public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
-        super(baseTags, configuration, filter);
+    public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) {
+        super(baseTags, configuration, filter, jobHistoryEndpointConfig);
     }
 
     @SuppressWarnings("deprecation")
@@ -233,7 +234,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
         if (js.getFailedReduces() != null) {
             values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
         }
-        values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCESS.name());
+        values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCEEDED.name());
         handleJob(wrapper.getType(), values, js.getTotalCounters());
     }
 
@@ -289,7 +290,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
             values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
         }
         if (js.getStatus() != null) {
-            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+            values.put(Keys.TASK_STATUS, js.getStatus().toString());
         }
         handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
     }
@@ -308,7 +309,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
             values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
         }
         if (js.getStatus() != null) {
-            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+            values.put(Keys.TASK_STATUS, js.getStatus().toString());
         }
         if (js.getError() != null) {
             values.put(Keys.ERROR, js.getError().toString());
@@ -381,7 +382,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
             values.put(Keys.TASK_TYPE, js.getTaskType().toString());
         }
         if (js.getTaskStatus() != null) {
-            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+            values.put(Keys.TASK_STATUS, js.getTaskStatus().toString());
         }
         if (js.getAttemptId() != null) {
             values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
@@ -419,7 +420,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
             values.put(Keys.TASK_TYPE, js.getTaskType().toString());
         }
         if (js.getTaskStatus() != null) {
-            values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+            values.put(Keys.TASK_STATUS, js.getTaskStatus().toString());
         }
         if (js.getAttemptId() != null) {
             values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index 718612d..386d50c 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -46,7 +46,7 @@ public class JHFParserFactory {
 
         switch (f) {
             case MRVer2:
-                JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
+                JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig());
                 reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
                 reader2.addListener(new TaskFailureListener(configManager));
                 reader2.addListener(new TaskAttemptCounterListener(configManager));
@@ -57,7 +57,7 @@ public class JHFParserFactory {
                 break;
             case MRVer1:
             default:
-                JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
+                JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig());
                 reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
                 reader1.addListener(new TaskFailureListener(configManager));
                 reader1.addListener(new TaskAttemptCounterListener(configManager));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index a681aca..520fbbc 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -43,7 +43,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
     List<JobEventAPIEntity> jobEvents = new ArrayList<>();
     List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
     List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
-    private JobHistoryZKStateManager zkState;
     private TimeZone timeZone;
 
     public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) {
@@ -56,7 +55,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
         }
         this.batchSize = batchSize;
-        zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig());
         timeZone = TimeZone.getTimeZone(configManager.getControlConfig().timeZone);
     }
 
@@ -92,12 +90,13 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             eagleServiceConfig.password);
 
         client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+        JobHistoryZKStateManager zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig());
         logger.info("start flushing entities of total number " + list.size());
         for (int i = 0; i < list.size(); i++) {
             JobBaseAPIEntity entity = list.get(i);
             if (entity instanceof JobExecutionAPIEntity) {
                 jobs.add((JobExecutionAPIEntity) entity);
-                this.zkState.updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
+                zkState.updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
                     entity.getTags().get(MRJobTagName.JOB_ID.toString()),
                     ((JobExecutionAPIEntity) entity).getCurrentState());
             } else if (entity instanceof JobEventAPIEntity) {
@@ -108,6 +107,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
                 taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
             }
         }
+        zkState.close();
         GenericServiceAPIResponseEntity result;
         if (jobs.size() > 0) {
             logger.info("flush JobExecutionAPIEntity of number " + jobs.size());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index db2c716..de874a6 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -40,6 +40,7 @@
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000,
     "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
+    "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888",
     "principal":"", #if not need, then empty
     "keytab":"",
     "basePath" : "/mr-history/done",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 9e156fa..5811f72 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -207,14 +207,15 @@ public class MRJobParser implements Runnable {
             }
             jobExecutionAPIEntity.setTimestamp(app.getStartedTime());
             jobExecutionAPIEntity.setSubmissionTime(app.getStartedTime());
+            jobExecutionAPIEntity.setTrackingUrl(app.getTrackingUrl());
             jobExecutionAPIEntity.setStartTime(mrJob.getStartTime());
             jobExecutionAPIEntity.setDurationTime(mrJob.getElapsedTime());
             jobExecutionAPIEntity.setCurrentState(mrJob.getState());
             jobExecutionAPIEntity.setInternalState(mrJob.getState());
             jobExecutionAPIEntity.setNumTotalMaps(mrJob.getMapsTotal());
-            jobExecutionAPIEntity.setMapsCompleted(mrJob.getMapsCompleted());
+            jobExecutionAPIEntity.setNumFinishedMaps(mrJob.getMapsCompleted());
             jobExecutionAPIEntity.setNumTotalReduces(mrJob.getReducesTotal());
-            jobExecutionAPIEntity.setReducesCompleted(mrJob.getReducesCompleted());
+            jobExecutionAPIEntity.setNumFinishedReduces(mrJob.getReducesCompleted());
             jobExecutionAPIEntity.setMapProgress(mrJob.getMapProgress());
             jobExecutionAPIEntity.setReduceProgress(mrJob.getReduceProgress());
             jobExecutionAPIEntity.setMapsPending(mrJob.getMapsPending());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
new file mode 100644
index 0000000..93c6c00
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
@@ -0,0 +1,121 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.UnitJobCount;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MRJobCountHelper {
+
+    public void initJobCountList(List<UnitJobCount> jobCounts, long startTime, long endTime, long intervalInSecs) {
+        for (long i = startTime / intervalInSecs; i * intervalInSecs <= endTime; i++) {
+            jobCounts.add(new UnitJobCount(i * intervalInSecs));
+        }
+    }
+
+    public String moveTimeforwardOneDay(String startTime) throws ParseException {
+        long timeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
+        timeInSecs -= 24L * 60L * 60L;
+        return DateTimeUtil.secondsToHumanDate(timeInSecs);
+    }
+
+    public JobCountResponse getRunningJobCount(List<JobExecutionAPIEntity> jobDurations,
+                                        long startTimeInSecs,
+                                        long endTimeInSecs,
+                                        long intervalInSecs) {
+        JobCountResponse response = new JobCountResponse();
+        List<UnitJobCount> jobCounts = new ArrayList<>();
+        initJobCountList(jobCounts, startTimeInSecs, endTimeInSecs, intervalInSecs);
+        for (JobExecutionAPIEntity jobDuration: jobDurations) {
+            countJob(jobCounts, jobDuration.getStartTime() / 1000, jobDuration.getEndTime() / 1000, intervalInSecs, jobDuration.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+        }
+        response.jobCounts = jobCounts;
+        return response;
+    }
+
+    public JobCountResponse getHistoryJobCount(List<JobExecutionAPIEntity> jobDurations, String timeList) {
+        JobCountResponse response = new JobCountResponse();
+        List<UnitJobCount> jobCounts = new ArrayList<>();
+        List<Long> times = TaskCountByDurationHelper.parseTimeList(timeList);
+        for (int i = 0; i < times.size(); i++) {
+            jobCounts.add(new UnitJobCount(times.get(i)));
+        }
+        for (JobExecutionAPIEntity job : jobDurations) {
+            int jobIndex = TaskCountByDurationHelper.getPosition(times, job.getDurationTime());
+            UnitJobCount counter = jobCounts.get(jobIndex);
+            countJob(counter, job.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+        }
+        response.jobCounts = jobCounts;
+        return response;
+    }
+
+    public void countJob(UnitJobCount counter, String jobType) {
+        if (null  ==  jobType) {
+            jobType = "null";
+        }
+        counter.jobCount++;
+        if (counter.jobCountByType.containsKey(jobType)) {
+            counter.jobCountByType.put(jobType, counter.jobCountByType.get(jobType) + 1);
+        } else {
+            counter.jobCountByType.put(jobType, 1L);
+        }
+    }
+
+    public void countJob(List<UnitJobCount> jobCounts, long jobStartTimeSecs, long jobEndTimeSecs, long intervalInSecs, String jobType) {
+        long startCountPoint = jobCounts.get(0).timeBucket;
+        if (jobEndTimeSecs < startCountPoint) {
+            return;
+        }
+        int startIndex = 0;
+        if (jobStartTimeSecs > startCountPoint) {
+            long relativeStartTime = jobStartTimeSecs - startCountPoint;
+            startIndex = (int) (relativeStartTime / intervalInSecs) + (relativeStartTime % intervalInSecs == 0 ? 0 : 1);
+        }
+        long relativeEndTime = jobEndTimeSecs - startCountPoint;
+        int endIndex = (int) (relativeEndTime / intervalInSecs);
+
+        for (int i = startIndex; i <= endIndex && i < jobCounts.size(); i++) {
+            countJob(jobCounts.get(i), jobType);
+        }
+    }
+
+    public List<String> getSearchTimeDuration(List<JobExecutionAPIEntity> jobEntities) {
+        List<String> pair = new ArrayList<>();
+        long minStartTime = System.currentTimeMillis();
+        long maxEndTime = 0;
+        for (JobExecutionAPIEntity jobEntity : jobEntities) {
+            if (minStartTime > jobEntity.getStartTime()) {
+                minStartTime = jobEntity.getStartTime();
+            }
+            if (maxEndTime < jobEntity.getEndTime()) {
+                maxEndTime = jobEntity.getEndTime();
+            }
+        }
+        pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(minStartTime));
+        pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(maxEndTime));
+        return pair;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
index 3e487ae..5af9811 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
@@ -18,19 +18,25 @@
 
 package org.apache.eagle.service.jpm;
 
-
 import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_TYPE;
 
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
 import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
 import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.service.generic.GenericEntityServiceResource;
+import org.apache.eagle.service.generic.ListQueryResource;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.TaskCountPerJobResponse;
 
 import org.apache.commons.lang.time.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.ParseException;
 import java.util.*;
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
@@ -59,14 +65,14 @@ public class MRJobExecutionResource {
         List<TaggedLogAPIEntity> jobs = new ArrayList<>();
         List<TaggedLogAPIEntity> finishedJobs = new ArrayList<>();
         Set<String> jobIds = new HashSet<>();
-        final Map<String,Object> meta = new HashMap<>();
+        final Map<String, Object> meta = new HashMap<>();
         StopWatch stopWatch = new StopWatch();
 
         stopWatch.start();
         String jobQuery = String.format(query, Constants.JPA_JOB_EXECUTION_SERVICE_NAME);
         GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res =
-                resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin,
-                top,filterIfMissing, parallel, metricName, verbose);
+            resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin,
+                top, filterIfMissing, parallel, metricName, verbose);
         if (res.isSuccess() && res.getObj() != null) {
             for (TaggedLogAPIEntity o : res.getObj()) {
                 finishedJobs.add(o);
@@ -74,10 +80,10 @@ public class MRJobExecutionResource {
             }
             jobQuery = String.format(query, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME);
             res = resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin,
-                    top,filterIfMissing, parallel, metricName, verbose);
+                top, filterIfMissing, parallel, metricName, verbose);
             if (res.isSuccess() && res.getObj() != null) {
                 for (TaggedLogAPIEntity o : res.getObj()) {
-                    if (! isDuplicate(jobIds, o)) {
+                    if (!isDuplicate(jobIds, o)) {
                         jobs.add(o);
                     }
                 }
@@ -92,7 +98,7 @@ public class MRJobExecutionResource {
             response.setException(new Exception(res.getException()));
         }
         meta.put(TOTAL_RESULTS, jobs.size());
-        meta.put(ELAPSEDMS,stopWatch.getTime());
+        meta.put(ELAPSEDMS, stopWatch.getTime());
         response.setObj(jobs);
         response.setMeta(meta);
         return response;
@@ -107,7 +113,7 @@ public class MRJobExecutionResource {
     }
 
     private String buildCondition(String jobId, String jobDefId, String site) {
-        String conditionFormat = "@site=\"%s\"" ;
+        String conditionFormat = "@site=\"%s\"";
         String condition = null;
         if (jobDefId != null) {
             conditionFormat = conditionFormat + " AND @jobDefId=\"%s\"";
@@ -144,12 +150,12 @@ public class MRJobExecutionResource {
         }
         LOG.debug("search condition=" + condition);
 
-        final Map<String,Object> meta = new HashMap<>();
+        final Map<String, Object> meta = new HashMap<>();
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         String queryFormat = "%s[%s]{*}";
         String queryString = String.format(queryFormat, Constants.JPA_JOB_EXECUTION_SERVICE_NAME, condition);
-        GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res = resource.search(queryString, null, null, pageSize, null, false, true,  0L, 0, true, 0, null, false);
+        GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false);
         if (res.isSuccess() && res.getObj() != null) {
             for (TaggedLogAPIEntity o : res.getObj()) {
                 jobs.add(o);
@@ -157,10 +163,10 @@ public class MRJobExecutionResource {
             }
         }
         queryString = String.format(queryFormat, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, condition);
-        res = resource.search(queryString, null, null, pageSize, null, false, true,  0L, 0, true, 0, null, false);
+        res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false);
         if (res.isSuccess() && res.getObj() != null) {
             for (TaggedLogAPIEntity o : res.getObj()) {
-                if (! isDuplicate(jobIds, o)) {
+                if (!isDuplicate(jobIds, o)) {
                     jobs.add(o);
                 }
             }
@@ -181,128 +187,228 @@ public class MRJobExecutionResource {
             response.setException(new Exception(res.getException()));
         }
         meta.put(TOTAL_RESULTS, jobs.size());
-        meta.put(ELAPSEDMS,stopWatch.getTime());
+        meta.put(ELAPSEDMS, stopWatch.getTime());
         response.setObj(jobs);
         response.setMeta(meta);
         return response;
     }
 
-    public List<Long> parseTimeList(String timelist) {
-        List<Long> times = new ArrayList<>();
-        String [] strs = timelist.split("[,\\s]");
-        for (String str : strs) {
-            try {
-                times.add(Long.parseLong(str));
-            } catch (Exception ex) {
-                LOG.warn(str + " is not a number");
-            }
-        }
-        return times;
-    }
 
-    public int getPosition(List<Long> times, Long duration) {
-        duration = duration / 1000;
-        for (int i = 1; i < times.size(); i++) {
-            if (duration < times.get(i)) {
-                return i - 1;
-            }
-        }
-        return times.size() - 1;
-    }
 
-    public void getTopTasks(List<MRJobTaskGroupResponse.UnitTaskCount> list, long top) {
-        for (MRJobTaskGroupResponse.UnitTaskCount taskCounter : list) {
-            Iterator<TaskExecutionAPIEntity> iterator = taskCounter.entities.iterator();
-            for (int i = 0; i < top && iterator.hasNext(); i++) {
-                taskCounter.topEntities.add(iterator.next());
-            }
-            taskCounter.entities.clear();
-        }
-    }
-
-    public void initTaskCountList(List<MRJobTaskGroupResponse.UnitTaskCount> runningTaskCount,
-                                  List<MRJobTaskGroupResponse.UnitTaskCount> finishedTaskCount,
-                                  List<Long> times,
-                                  Comparator comparator) {
-        for (int i = 0; i < times.size(); i++) {
-            runningTaskCount.add(new MRJobTaskGroupResponse.UnitTaskCount(times.get(i), comparator));
-            finishedTaskCount.add(new MRJobTaskGroupResponse.UnitTaskCount(times.get(i), comparator));
-        }
-    }
 
     @GET
-    @Path("{jobId}/taskCounts")
+    @Path("{jobId}/taskCountsByDuration")
     @Produces(MediaType.APPLICATION_JSON)
-    public MRJobTaskGroupResponse getTaskCounts(@PathParam("jobId") String jobId,
-                                                @QueryParam("site") String site,
-                                                @QueryParam("timelineInSecs") String timeList,
-                                                @QueryParam("top") long top) {
-        MRJobTaskGroupResponse response = new MRJobTaskGroupResponse();
+    public TaskCountPerJobResponse getTaskCountsPerJob(@PathParam("jobId") String jobId,
+                                                       @QueryParam("site") String site,
+                                                       @QueryParam("timelineInSecs") String timeList,
+                                                       @QueryParam("top") long top) {
+        TaskCountPerJobResponse response = new TaskCountPerJobResponse();
         if (jobId == null || site == null || timeList == null || timeList.isEmpty()) {
             response.errMessage = "IllegalArgumentException: jobId == null || site == null || timelineInSecs == null or isEmpty";
             return response;
         }
-        List<MRJobTaskGroupResponse.UnitTaskCount> runningTaskCount = new ArrayList<>();
-        List<MRJobTaskGroupResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>();
+        TaskCountByDurationHelper helper = new TaskCountByDurationHelper();
+        List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new ArrayList<>();
+        List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>();
 
-        List<Long> times = parseTimeList(timeList);
+        List<Long> times = helper.parseTimeList(timeList);
         String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId);
         GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> historyRes =
-                resource.search(query,  null, null, Integer.MAX_VALUE, null, false, true,  0L, 0, true, 0, null, false);
+            resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
         if (historyRes.isSuccess() && historyRes.getObj() != null && historyRes.getObj().size() > 0) {
-            initTaskCountList(runningTaskCount, finishedTaskCount, times, new HistoryTaskComparator());
+            helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.HistoryTaskComparator());
             for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : historyRes.getObj()) {
-                int index = getPosition(times, o.getDuration());
-                MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index);
-                counter.taskCount++;
+                int index = helper.getPosition(times, o.getDuration());
+                MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index);
+                helper.countTask(counter, o.getTags().get(TASK_TYPE.toString()));
                 counter.entities.add(o);
             }
         } else {
             query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId);
             GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> runningRes =
-                    resource.search(query,  null, null, Integer.MAX_VALUE, null, false, true,  0L, 0, true, 0, null, false);
+                resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
             if (runningRes.isSuccess() && runningRes.getObj() != null) {
-                initTaskCountList(runningTaskCount, finishedTaskCount, times, new RunningTaskComparator());
+                helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.RunningTaskComparator());
                 for (TaskExecutionAPIEntity o : runningRes.getObj()) {
-                    int index = getPosition(times, o.getDuration());
+                    int index = helper.getPosition(times, o.getDuration());
                     if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
-                        MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index);
-                        counter.taskCount++;
+                        MRJobTaskCountResponse.UnitTaskCount counter = runningTaskCount.get(index);
+                        helper.countTask(counter, o.getTags().get(TASK_TYPE.toString()));
                         counter.entities.add(o);
                     } else if (o.getEndTime() != 0) {
-                        MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index);
-                        counter.taskCount++;
+                        MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index);
+                        helper.countTask(counter, o.getTags().get(TASK_TYPE.toString()));
                         counter.entities.add(o);
                     }
                 }
             }
         }
-        if (top > 0)  {
-            getTopTasks(runningTaskCount, top);
+        if (top > 0) {
+            helper.getTopTasks(runningTaskCount, top);
             response.runningTaskCount = runningTaskCount;
-            getTopTasks(finishedTaskCount, top);
+            helper.getTopTasks(finishedTaskCount, top);
             response.finishedTaskCount = finishedTaskCount;
         }
+        response.topNumber = top;
         return response;
     }
 
-    static class RunningTaskComparator implements Comparator<TaskExecutionAPIEntity> {
-        @Override
-        public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity o2) {
-            Long time1 = o1.getDuration();
-            Long time2 = o2.getDuration();
-            return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
+    @GET
+    @Path("runningJobCounts")
+    @Produces(MediaType.APPLICATION_JSON)
+    public JobCountResponse getRunningJobCount(@QueryParam("site") String site,
+                                               @QueryParam("durationBegin") String startTime,
+                                               @QueryParam("durationEnd") String endTime,
+                                               @QueryParam("intervalInSecs") long intervalInSecs) {
+        JobCountResponse response = new JobCountResponse();
+        MRJobCountHelper helper = new MRJobCountHelper();
+        if (site == null || startTime == null || endTime == null) {
+            response.errMessage = "IllegalArgument: site, durationBegin, durationEnd is null";
+            return response;
+        }
+        if (intervalInSecs <= 0) {
+            response.errMessage = String.format("IllegalArgument: intervalInSecs=%s is invalid", intervalInSecs);
+            return response;
+        }
+        long startTimeInMills;
+        String searchStartTime = startTime;
+        String searchEndTime = endTime;
+        try {
+            startTimeInMills = DateTimeUtil.humanDateToSeconds(startTime) * DateTimeUtil.ONESECOND;
+            searchStartTime = helper.moveTimeforwardOneDay(searchStartTime);
+        } catch (Exception e) {
+            response.errMessage = e.getMessage();
+            return response;
+        }
+        String query = String.format("%s[@site=\"%s\" AND @endTime>=%s]{@startTime,@endTime,@jobType}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, startTimeInMills);
+        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
+            resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+        if (!historyRes.isSuccess() || historyRes.getObj() == null) {
+            response.errMessage = String.format("Catch an exception: %s with query=%s", historyRes.getException(), query);
+            return response;
         }
+
+        try {
+            long startTimeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
+            long endTimeInSecs = DateTimeUtil.humanDateToSeconds(endTime);
+            return helper.getRunningJobCount(historyRes.getObj(), startTimeInSecs, endTimeInSecs, intervalInSecs);
+        } catch (Exception e) {
+            response.errMessage = e.getMessage();
+            return response;
+        }
+    }
+
+    @GET
+    @Path("jobMetrics/entities")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Object getJobMetricsByEntitiesQuery(@QueryParam("site") String site,
+                                               @QueryParam("timePoint") String timePoint,
+                                               @QueryParam("metricName") String metricName,
+                                               @QueryParam("intervalmin") long intervalmin,
+                                               @QueryParam("top") int top) {
+        return getJobMetrics(site, timePoint, metricName, intervalmin, top, queryMetricEntitiesFunc);
     }
 
-    static class HistoryTaskComparator implements Comparator<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> {
-        @Override
-        public int compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1,
-                           org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) {
-            Long time1 = o1.getDuration();
-            Long time2 = o2.getDuration();
-            return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
+    @GET
+    @Path("jobMetrics/list")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Object getJobMetricsByListQuery(@QueryParam("site") String site,
+                                           @QueryParam("timePoint") String timePoint,
+                                           @QueryParam("metricName") String metricName,
+                                           @QueryParam("intervalmin") long intervalmin,
+                                           @QueryParam("top") int top) {
+        return getJobMetrics(site, timePoint, metricName, intervalmin, top, queryMetricListFunc);
+    }
+
+    public Object getJobMetrics(String site, String timePoint, String metricName, long intervalmin, int top,
+                                Function6<String, String, String, Long, Integer, String, Object> metricQueryFunc) {
+        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+        MRJobCountHelper helper = new MRJobCountHelper();
+        if (site == null || timePoint == null || metricName == null) {
+            response.setException(new IllegalArgumentException("Error: site, timePoint, metricName may be unset"));
+            response.setSuccess(false);
+            return response;
+        }
+        if (intervalmin <= 0) {
+            LOG.warn("query parameter intervalmin <= 0, use default value 5 instead");
+            intervalmin = 5;
         }
+        if (top <= 0) {
+            LOG.warn("query parameter top <= 0, use default value 10 instead");
+            top = 10;
+        }
+
+        long timePointsInMills;
+        String searchStartTime = timePoint;
+        String searchEndTime = timePoint;
+        try {
+            timePointsInMills = DateTimeUtil.humanDateToSeconds(timePoint) * DateTimeUtil.ONESECOND;
+            searchStartTime = helper.moveTimeforwardOneDay(searchStartTime);
+        } catch (ParseException e) {
+            response.setException(e);
+            response.setSuccess(false);
+            return response;
+        }
+        String query = String.format("%s[@site=\"%s\" AND @startTime<=\"%s\" AND @endTime>=\"%s\"]{@startTime,@endTime}",
+            Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, timePointsInMills, timePointsInMills);
+        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
+            resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+        if (!historyRes.isSuccess() || historyRes.getObj() == null) {
+            return historyRes;
+        }
+
+        List<String> timeDuration = helper.getSearchTimeDuration(historyRes.getObj());
+        LOG.info(String.format("new search time range: startTime=%s, endTime=%s", timeDuration.get(0), timeDuration.get(1)));
+        query = String.format("%s[@site=\"%s\"]<@jobId>{sum(value)}.{sum(value) desc}", Constants.GENERIC_METRIC_SERVICE, site);
+        return metricQueryFunc.apply(query, timeDuration.get(0), timeDuration.get(1), intervalmin, top, metricName);
     }
 
+    Function6<String, String, String, Long, Integer, String, Object> queryMetricEntitiesFunc
+        = (query, startTime, endTime, intervalmin, top, metricName) -> {
+            GenericEntityServiceResource resource = new GenericEntityServiceResource();
+            return resource.search(query, startTime, endTime, Integer.MAX_VALUE, null,
+            false, true, intervalmin, top, true, 0, metricName, false);
+        };
+
+    Function6<String, String, String, Long, Integer, String, Object> queryMetricListFunc
+        = (query, startTime, endTime, intervalmin, top, metricName) -> {
+            ListQueryResource resource = new ListQueryResource();
+            return resource.listQuery(query, startTime, endTime, Integer.MAX_VALUE, null,
+            false, true, intervalmin, top, true, 0, metricName, false);
+        };
+
+    @FunctionalInterface
+    interface Function6<A, B, C, D, E, F, R> {
+        R apply(A a, B b, C c, D d, E e, F f);
+    }
+
+    @GET
+    @Path("jobCountsByDuration")
+    @Produces(MediaType.APPLICATION_JSON)
+    public JobCountResponse getJobCountGroupByDuration(@QueryParam("site") String site,
+                                                       @QueryParam("timelineInSecs") String timeList,
+                                                       @QueryParam("jobStartTimeBegin") String startTime,
+                                                       @QueryParam("jobStartTimeEnd") String endTime) {
+        JobCountResponse response = new JobCountResponse();
+        MRJobCountHelper helper = new MRJobCountHelper();
+        if (site == null || startTime == null || endTime == null || timeList == null) {
+            response.errMessage = "IllegalArgument: site, jobStartTimeBegin, jobStartTimeEnd, or timelineInSecs is null";
+            return response;
+        }
+        String query = String.format("%s[@site=\"%s\"]{@durationTime,@jobType}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site);
+        GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
+            resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+        if (!historyRes.isSuccess() || historyRes.getObj() == null) {
+            response.errMessage = String.format("Catch an exception: %s with query=%s", historyRes.getException(), query);
+            return response;
+        }
+        try {
+            return helper.getHistoryJobCount(historyRes.getObj(), timeList);
+        } catch (Exception e) {
+            response.errMessage = e.getMessage();
+            return response;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
new file mode 100644
index 0000000..c546198
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
@@ -0,0 +1,65 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import java.util.*;
+
+public class MRJobTaskCountResponse {
+    public String errMessage;
+
+    public static class TaskCountPerJobResponse extends MRJobTaskCountResponse {
+        public long topNumber;
+        public List<UnitTaskCount> runningTaskCount;
+        public List<UnitTaskCount> finishedTaskCount;
+    }
+
+    public static class JobCountResponse extends MRJobTaskCountResponse {
+        public List<UnitJobCount> jobCounts;
+    }
+
+    static class UnitTaskCount {
+        public long timeBucket;
+        public int taskCount;
+        public int mapTaskCount;
+        public int reduceTaskCount;
+        public Set entities;
+        public List topEntities;
+
+        UnitTaskCount(long timeBucket, Comparator comparator) {
+            this.timeBucket = timeBucket;
+            this.taskCount = 0;
+            this.mapTaskCount = 0;
+            this.reduceTaskCount = 0;
+            entities = new TreeSet<>(comparator);
+            topEntities = new ArrayList<>();
+        }
+    }
+
+    static class UnitJobCount {
+        public long timeBucket;
+        public long jobCount;
+        public Map<String, Long> jobCountByType;
+
+        UnitJobCount(long timeBucket) {
+            this.timeBucket = timeBucket;
+            this.jobCount = 0;
+            this.jobCountByType = new HashMap<>();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java
deleted file mode 100644
index 3be9b43..0000000
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.eagle.service.jpm;
-
-import java.util.*;
-
-class MRJobTaskGroupResponse {
-    public List<UnitTaskCount> runningTaskCount;
-    public List<UnitTaskCount> finishedTaskCount;
-    public String errMessage;
-
-   static class UnitTaskCount {
-        public long timeBucket;
-        public int taskCount;
-        public Set entities;
-        public List topEntities;
-
-        UnitTaskCount(long timeBucket, Comparator comparator) {
-            this.timeBucket = timeBucket;
-            this.taskCount = 0;
-            entities = new TreeSet<>(comparator);
-            topEntities = new ArrayList<>();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
new file mode 100644
index 0000000..0eeb440
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+public class TaskCountByDurationHelper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TaskCountByDurationHelper.class);
+
+    public static List<Long> parseTimeList(String timelist) {
+        List<Long> times = new ArrayList<>();
+        String [] strs = timelist.split("[,\\s]");
+        for (String str : strs) {
+            try {
+                times.add(Long.parseLong(str));
+            } catch (Exception ex) {
+                LOG.warn(str + " is not a number");
+            }
+        }
+        return times;
+    }
+
+    public static int getPosition(List<Long> times, Long duration) {
+        duration = duration / 1000;
+        for (int i = 1; i < times.size(); i++) {
+            if (duration < times.get(i)) {
+                return i - 1;
+            }
+        }
+        return times.size() - 1;
+    }
+
+    public void getTopTasks(List<MRJobTaskCountResponse.UnitTaskCount> list, long top) {
+        for (MRJobTaskCountResponse.UnitTaskCount taskCounter : list) {
+            Iterator<TaskExecutionAPIEntity> iterator = taskCounter.entities.iterator();
+            for (int i = 0; i < top && iterator.hasNext(); i++) {
+                taskCounter.topEntities.add(iterator.next());
+            }
+            taskCounter.entities.clear();
+        }
+    }
+
+    public void countTask(MRJobTaskCountResponse.UnitTaskCount counter, String taskType) {
+        counter.taskCount++;
+        if (taskType.equalsIgnoreCase(Constants.TaskType.MAP.toString())) {
+            counter.mapTaskCount++;
+        } else if (taskType.equalsIgnoreCase(Constants.TaskType.REDUCE.toString())) {
+            counter.reduceTaskCount++;
+        }
+    }
+
+    public void initTaskCountList(List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount,
+                                  List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount,
+                                  List<Long> times,
+                                  Comparator comparator) {
+        for (int i = 0; i < times.size(); i++) {
+            runningTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator));
+            finishedTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator));
+        }
+    }
+
+    static class RunningTaskComparator implements Comparator<TaskExecutionAPIEntity> {
+        @Override
+        public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity o2) {
+            Long time1 = o1.getDuration();
+            Long time2 = o2.getDuration();
+            return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
+        }
+    }
+
+    static class HistoryTaskComparator implements Comparator<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> {
+        @Override
+        public int compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1,
+                           org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) {
+            Long time1 = o1.getDuration();
+            Long time2 = o2.getDuration();
+            return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
new file mode 100644
index 0000000..718f068
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
@@ -0,0 +1,87 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestJobCountPerBucketHelper {
+    MRJobCountHelper helper = new MRJobCountHelper();
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestJobCountPerBucketHelper.class);
+
+    @Test
+    public void test() throws ParseException {
+        String timeString = "2016-08-22 20:13:00";
+        long timestamp = DateTimeUtil.humanDateToSeconds(timeString);
+        String timeString2 = DateTimeUtil.secondsToHumanDate(timestamp);
+        Assert.assertTrue(timeString2.equals(timeString));
+
+        String timeString3 = helper.moveTimeforwardOneDay(timeString);
+        Assert.assertTrue(timeString3.equals("2016-08-21 20:13:00"));
+    }
+
+    @Test
+    public void test2() throws ParseException {
+        String startTime = "2016-08-22 20:13:00";
+        String endTime = "2016-08-22 24:13:00";
+        List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new ArrayList<>();
+        helper.initJobCountList(jobCounts, DateTimeUtil.humanDateToSeconds(startTime), DateTimeUtil.humanDateToSeconds(endTime), 15 * 60);
+        /*for (MRJobTaskCountResponse.UnitJobCount jobCount : jobCounts) {
+            LOG.info(DateTimeUtil.secondsToHumanDate(jobCount.timeBucket));
+        }*/
+        Assert.assertTrue(DateTimeUtil.secondsToHumanDate(jobCounts.get(1).timeBucket).equals("2016-08-22 20:15:00"));
+    }
+
+    @Test
+    public void test3() {
+        List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new ArrayList<>();
+        long intervalSecs = 5;
+        helper.initJobCountList(jobCounts, 3, 31, intervalSecs);
+        helper.countJob(jobCounts, 5, 10, intervalSecs, "hive");
+        helper.countJob(jobCounts, 13, 18, intervalSecs, "hive");
+        helper.countJob(jobCounts, 18, 28, intervalSecs, "hive");
+        helper.countJob(jobCounts, 25, 33, intervalSecs, "hive");
+        Assert.assertTrue(jobCounts.size() == 7);
+        Assert.assertTrue(jobCounts.get(1).jobCount == 1);
+        Assert.assertTrue(jobCounts.get(5).jobCount == 2);
+    }
+
+    @Test
+    public void test4() throws ParseException {
+        List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new ArrayList<>();
+        long intervalSecs = 60 * 15;
+        String startTime = "2016-08-22 20:13:00";
+        String endTime = "2016-08-22 24:13:00";
+        helper.initJobCountList(jobCounts, DateTimeUtil.humanDateToSeconds(startTime), DateTimeUtil.humanDateToSeconds(endTime), intervalSecs);
+        helper.countJob(jobCounts,
+                DateTimeUtil.humanDateToSeconds("2016-08-22 20:23:00"),
+                DateTimeUtil.humanDateToSeconds("2016-08-22 20:30:00"),
+                intervalSecs,
+                "hive");
+        Assert.assertTrue(jobCounts.get(2).jobCount == 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java
deleted file mode 100644
index 824556b..0000000
--- a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.eagle.service.jpm;
-
-import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
-import org.apache.eagle.jpm.util.Constants;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-
-public class TestMRJobExecutionResource {
-
-    @Test
-    public void test() {
-        MRJobExecutionResource resource = new MRJobExecutionResource();
-        String timeList = " 0, 10,20,40 ";
-        List<Long> times = resource.parseTimeList(timeList);
-        Assert.assertTrue(times.size() == 4);
-
-        long val = 25 * 1000;
-        int index = resource.getPosition(times, val);
-        Assert.assertTrue(index == 2);
-    }
-
-    @Test
-    public void test2() {
-        MRJobExecutionResource resource = new MRJobExecutionResource();
-        String timeList = " 0, 10,20,40 ";
-        List<Long> times = resource.parseTimeList(timeList);
-
-        TaskExecutionAPIEntity test1 = new TaskExecutionAPIEntity();
-        test1.setDuration(15 * 1000);
-        test1.setTaskStatus("running");
-        TaskExecutionAPIEntity test4 = new TaskExecutionAPIEntity();
-        test4.setDuration(13 * 1000);
-        test4.setTaskStatus("running");
-        TaskExecutionAPIEntity test2 = new TaskExecutionAPIEntity();
-        test2.setDuration(0 * 1000);
-        test2.setEndTime(100);
-        test2.setTaskStatus("x");
-        TaskExecutionAPIEntity test3 = new TaskExecutionAPIEntity();
-        test3.setDuration(19 * 1000);
-        test3.setTaskStatus("running");
-        TaskExecutionAPIEntity test5 = new TaskExecutionAPIEntity();
-        test5.setDuration(20 * 1000);
-        test5.setEndTime(28);
-        test5.setTaskStatus("x");
-        List<TaskExecutionAPIEntity> tasks = new ArrayList<>();
-        tasks.add(test1);
-        tasks.add(test2);
-        tasks.add(test3);
-        tasks.add(test4);
-        tasks.add(test5);
-
-        List<MRJobTaskGroupResponse.UnitTaskCount> runningTaskCount = new ArrayList<>();
-        List<MRJobTaskGroupResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>();
-
-        Comparator comparator = new MRJobExecutionResource.RunningTaskComparator();
-        resource.initTaskCountList(runningTaskCount, finishedTaskCount, times, comparator);
-
-        for (TaskExecutionAPIEntity o : tasks) {
-            int index = resource.getPosition(times, o.getDuration());
-            if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
-                MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index);
-                counter.taskCount++;
-                counter.entities.add(o);
-            } else if (o.getEndTime() != 0) {
-                MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index);
-                counter.taskCount++;
-                counter.entities.add(o);
-            }
-        }
-        int top = 2;
-        if (top > 0)  {
-            resource.getTopTasks(runningTaskCount, top);
-        }
-        Assert.assertTrue(runningTaskCount.get(1).taskCount == 3);
-        Assert.assertTrue(runningTaskCount.get(1).topEntities.size() == 2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
new file mode 100644
index 0000000..2cd0b8e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
@@ -0,0 +1,96 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestTaskCountPerJobHelper {
+    TaskCountByDurationHelper helper = new TaskCountByDurationHelper();
+
+    @Test
+    public void test() {
+        String timeList = " 0, 10,20,40 ";
+        List<Long> times = helper.parseTimeList(timeList);
+        Assert.assertTrue(times.size() == 4);
+
+        long val = 25 * 1000;
+        int index = helper.getPosition(times, val);
+        Assert.assertTrue(index == 2);
+    }
+
+    @Test
+    public void test2() {
+        TaskExecutionAPIEntity test1 = new TaskExecutionAPIEntity();
+        test1.setDuration(15 * 1000);
+        test1.setTaskStatus("running");
+        TaskExecutionAPIEntity test4 = new TaskExecutionAPIEntity();
+        test4.setDuration(13 * 1000);
+        test4.setTaskStatus("running");
+        TaskExecutionAPIEntity test2 = new TaskExecutionAPIEntity();
+        test2.setDuration(0 * 1000);
+        test2.setEndTime(100);
+        test2.setTaskStatus("x");
+        TaskExecutionAPIEntity test3 = new TaskExecutionAPIEntity();
+        test3.setDuration(19 * 1000);
+        test3.setTaskStatus("running");
+        TaskExecutionAPIEntity test5 = new TaskExecutionAPIEntity();
+        test5.setDuration(20 * 1000);
+        test5.setEndTime(28);
+        test5.setTaskStatus("x");
+        List<TaskExecutionAPIEntity> tasks = new ArrayList<>();
+        tasks.add(test1);
+        tasks.add(test2);
+        tasks.add(test3);
+        tasks.add(test4);
+        tasks.add(test5);
+
+        List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new ArrayList<>();
+        List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>();
+
+        String timeList = " 0, 10,20,40 ";
+        List<Long> times = helper.parseTimeList(timeList);
+
+        helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.RunningTaskComparator());
+
+        for (TaskExecutionAPIEntity o : tasks) {
+            int index = helper.getPosition(times, o.getDuration());
+            if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
+                MRJobTaskCountResponse.UnitTaskCount counter = runningTaskCount.get(index);
+                counter.taskCount++;
+                counter.entities.add(o);
+            } else if (o.getEndTime() != 0) {
+                MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index);
+                counter.taskCount++;
+                counter.entities.add(o);
+            }
+        }
+        int top = 2;
+        if (top > 0)  {
+            helper.getTopTasks(runningTaskCount, top);
+        }
+        Assert.assertTrue(runningTaskCount.get(1).taskCount == 3);
+        Assert.assertTrue(runningTaskCount.get(1).topEntities.size() == 2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 7dce0a2..ec56eac 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory;
 public class Constants {
     private static final Logger LOG = LoggerFactory.getLogger(Constants.class);
 
+    public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
+
     //SPARK
     public static final String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
     public static final String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";