You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:37 UTC

[41/52] [abbrv] incubator-eagle git commit: add jobTypes list in response

add jobTypes list in response

Author: Qingwen Zhao <qi...@gmail.com>

Closes #421 from qingwen220/jobAPIUpdate.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/fce6ae3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/fce6ae3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/fce6ae3f

Branch: refs/heads/master
Commit: fce6ae3f0c33ded811e78ca4bf8ff4c0a7ff3935
Parents: 3110c72
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Wed Sep 7 11:07:17 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Wed Sep 7 11:07:17 2016 +0800

----------------------------------------------------------------------
 .../eagle/service/jpm/MRJobCountHelper.java     | 31 ++++++++++++++++----
 .../service/jpm/MRJobExecutionResource.java     | 16 +++++++---
 .../service/jpm/MRJobTaskCountResponse.java     |  1 +
 .../jpm/TestJobCountPerBucketHelper.java        |  7 +++--
 4 files changed, 43 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fce6ae3f/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
index 93c6c00..2fa5c04 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
@@ -20,45 +20,61 @@ package org.apache.eagle.service.jpm;
 
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.MRJobTagName;
 import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse;
 import org.apache.eagle.service.jpm.MRJobTaskCountResponse.UnitJobCount;
 
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 public class MRJobCountHelper {
 
     public void initJobCountList(List<UnitJobCount> jobCounts, long startTime, long endTime, long intervalInSecs) {
         for (long i = startTime / intervalInSecs; i * intervalInSecs <= endTime; i++) {
-            jobCounts.add(new UnitJobCount(i * intervalInSecs));
+            jobCounts.add(new UnitJobCount(i * intervalInSecs * DateTimeUtil.ONESECOND));
         }
     }
 
-    public String moveTimeforwardOneDay(String startTime) throws ParseException {
+    public String moveTimeForwardOneDay(String startTime) throws ParseException {
         long timeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
         timeInSecs -= 24L * 60L * 60L;
         return DateTimeUtil.secondsToHumanDate(timeInSecs);
     }
 
     public JobCountResponse getRunningJobCount(List<JobExecutionAPIEntity> jobDurations,
+                                               List<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningJobs,
                                         long startTimeInSecs,
                                         long endTimeInSecs,
                                         long intervalInSecs) {
-        JobCountResponse response = new JobCountResponse();
         List<UnitJobCount> jobCounts = new ArrayList<>();
+        Set<String> jobTypes = new HashSet<>();
         initJobCountList(jobCounts, startTimeInSecs, endTimeInSecs, intervalInSecs);
         for (JobExecutionAPIEntity jobDuration: jobDurations) {
-            countJob(jobCounts, jobDuration.getStartTime() / 1000, jobDuration.getEndTime() / 1000, intervalInSecs, jobDuration.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+            String jobType = jobDuration.getTags().get(MRJobTagName.JOB_TYPE.toString());
+            jobTypes.add(jobType);
+            countJob(jobCounts, jobDuration.getStartTime() / 1000, jobDuration.getEndTime() / 1000, intervalInSecs, jobType);
+        }
+        for (org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity job : runningJobs) {
+            if (job.getInternalState() != null && !job.getInternalState().equalsIgnoreCase(Constants.JobState.FINISHED.toString())) {
+                String jobType = job.getTags().get(MRJobTagName.JOB_TYPE.toString());
+                jobTypes.add(jobType);
+                countJob(jobCounts, job.getStartTime() / 1000, endTimeInSecs, intervalInSecs, jobType);
+            }
         }
+        JobCountResponse response = new JobCountResponse();
         response.jobCounts = jobCounts;
+        response.jobTypes = jobTypes;
         return response;
     }
 
     public JobCountResponse getHistoryJobCount(List<JobExecutionAPIEntity> jobDurations, String timeList) {
         JobCountResponse response = new JobCountResponse();
         List<UnitJobCount> jobCounts = new ArrayList<>();
+        Set<String> jobTypes = new HashSet<>();
         List<Long> times = TaskCountByDurationHelper.parseTimeList(timeList);
         for (int i = 0; i < times.size(); i++) {
             jobCounts.add(new UnitJobCount(times.get(i)));
@@ -66,9 +82,12 @@ public class MRJobCountHelper {
         for (JobExecutionAPIEntity job : jobDurations) {
             int jobIndex = TaskCountByDurationHelper.getPosition(times, job.getDurationTime());
             UnitJobCount counter = jobCounts.get(jobIndex);
-            countJob(counter, job.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+            String jobType = job.getTags().get(MRJobTagName.JOB_TYPE.toString());
+            jobTypes.add(jobType);
+            countJob(counter, jobType);
         }
         response.jobCounts = jobCounts;
+        response.jobTypes = jobTypes;
         return response;
     }
 
@@ -85,7 +104,7 @@ public class MRJobCountHelper {
     }
 
     public void countJob(List<UnitJobCount> jobCounts, long jobStartTimeSecs, long jobEndTimeSecs, long intervalInSecs, String jobType) {
-        long startCountPoint = jobCounts.get(0).timeBucket;
+        long startCountPoint = jobCounts.get(0).timeBucket / DateTimeUtil.ONESECOND;
         if (jobEndTimeSecs < startCountPoint) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fce6ae3f/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
index 5af9811..e6041f2 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
@@ -276,7 +276,7 @@ public class MRJobExecutionResource {
         String searchEndTime = endTime;
         try {
             startTimeInMills = DateTimeUtil.humanDateToSeconds(startTime) * DateTimeUtil.ONESECOND;
-            searchStartTime = helper.moveTimeforwardOneDay(searchStartTime);
+            searchStartTime = helper.moveTimeForwardOneDay(searchStartTime);
         } catch (Exception e) {
             response.errMessage = e.getMessage();
             return response;
@@ -285,15 +285,23 @@ public class MRJobExecutionResource {
         GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
             resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
         if (!historyRes.isSuccess() || historyRes.getObj() == null) {
-            response.errMessage = String.format("Catch an exception: %s with query=%s", historyRes.getException(), query);
+            response.errMessage = String.format("Catch an exception during fetch history jobs: %s with query=%s", historyRes.getException(), query);
+            return response;
+        }
+        query = String.format("%s[@site=\"%s\"]{@startTime,@endTime,@jobType}", Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site);
+        GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningRes =
+            resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+        if (!runningRes.isSuccess() || runningRes.getObj() == null) {
+            response.errMessage = String.format("Catch an exception during fetch running jobs: %s with query=%s", runningRes.getException(), query);
             return response;
         }
 
         try {
             long startTimeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
             long endTimeInSecs = DateTimeUtil.humanDateToSeconds(endTime);
-            return helper.getRunningJobCount(historyRes.getObj(), startTimeInSecs, endTimeInSecs, intervalInSecs);
+            return helper.getRunningJobCount(historyRes.getObj(), runningRes.getObj(), startTimeInSecs, endTimeInSecs, intervalInSecs);
         } catch (Exception e) {
+            e.printStackTrace();
             response.errMessage = e.getMessage();
             return response;
         }
@@ -344,7 +352,7 @@ public class MRJobExecutionResource {
         String searchEndTime = timePoint;
         try {
             timePointsInMills = DateTimeUtil.humanDateToSeconds(timePoint) * DateTimeUtil.ONESECOND;
-            searchStartTime = helper.moveTimeforwardOneDay(searchStartTime);
+            searchStartTime = helper.moveTimeForwardOneDay(searchStartTime);
         } catch (ParseException e) {
             response.setException(e);
             response.setSuccess(false);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fce6ae3f/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
index c546198..170533c 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
@@ -30,6 +30,7 @@ public class MRJobTaskCountResponse {
     }
 
     public static class JobCountResponse extends MRJobTaskCountResponse {
+        public Set<String> jobTypes;
         public List<UnitJobCount> jobCounts;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fce6ae3f/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
index 718f068..c8d8869 100644
--- a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
+++ b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
@@ -40,8 +40,11 @@ public class TestJobCountPerBucketHelper {
         String timeString2 = DateTimeUtil.secondsToHumanDate(timestamp);
         Assert.assertTrue(timeString2.equals(timeString));
 
-        String timeString3 = helper.moveTimeforwardOneDay(timeString);
+        String timeString3 = helper.moveTimeForwardOneDay(timeString);
         Assert.assertTrue(timeString3.equals("2016-08-21 20:13:00"));
+
+        String timeString4 = helper.moveTimeForwardOneDay(timeString3);
+        Assert.assertTrue(timeString4.equals("2016-08-20 20:13:00"));
     }
 
     @Test
@@ -53,7 +56,7 @@ public class TestJobCountPerBucketHelper {
         /*for (MRJobTaskCountResponse.UnitJobCount jobCount : jobCounts) {
             LOG.info(DateTimeUtil.secondsToHumanDate(jobCount.timeBucket));
         }*/
-        Assert.assertTrue(DateTimeUtil.secondsToHumanDate(jobCounts.get(1).timeBucket).equals("2016-08-22 20:15:00"));
+        Assert.assertTrue(DateTimeUtil.millisecondsToHumanDateWithSeconds(jobCounts.get(1).timeBucket).equals("2016-08-22 20:15:00"));
     }
 
     @Test