You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/09/07 03:07:26 UTC
incubator-eagle git commit: add jobTypes list in response
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 3110c72e4 -> fce6ae3f0
add jobTypes list in response
Author: Qingwen Zhao <qi...@gmail.com>
Closes #421 from qingwen220/jobAPIUpdate.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/fce6ae3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/fce6ae3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/fce6ae3f
Branch: refs/heads/develop
Commit: fce6ae3f0c33ded811e78ca4bf8ff4c0a7ff3935
Parents: 3110c72
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Wed Sep 7 11:07:17 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Wed Sep 7 11:07:17 2016 +0800
----------------------------------------------------------------------
.../eagle/service/jpm/MRJobCountHelper.java | 31 ++++++++++++++++----
.../service/jpm/MRJobExecutionResource.java | 16 +++++++---
.../service/jpm/MRJobTaskCountResponse.java | 1 +
.../jpm/TestJobCountPerBucketHelper.java | 7 +++--
4 files changed, 43 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fce6ae3f/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
index 93c6c00..2fa5c04 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
@@ -20,45 +20,61 @@ package org.apache.eagle.service.jpm;
import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.MRJobTagName;
import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse;
import org.apache.eagle.service.jpm.MRJobTaskCountResponse.UnitJobCount;
import java.text.ParseException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
public class MRJobCountHelper {
public void initJobCountList(List<UnitJobCount> jobCounts, long startTime, long endTime, long intervalInSecs) {
for (long i = startTime / intervalInSecs; i * intervalInSecs <= endTime; i++) {
- jobCounts.add(new UnitJobCount(i * intervalInSecs));
+ jobCounts.add(new UnitJobCount(i * intervalInSecs * DateTimeUtil.ONESECOND));
}
}
- public String moveTimeforwardOneDay(String startTime) throws ParseException {
+ public String moveTimeForwardOneDay(String startTime) throws ParseException {
long timeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
timeInSecs -= 24L * 60L * 60L;
return DateTimeUtil.secondsToHumanDate(timeInSecs);
}
public JobCountResponse getRunningJobCount(List<JobExecutionAPIEntity> jobDurations,
+ List<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningJobs,
long startTimeInSecs,
long endTimeInSecs,
long intervalInSecs) {
- JobCountResponse response = new JobCountResponse();
List<UnitJobCount> jobCounts = new ArrayList<>();
+ Set<String> jobTypes = new HashSet<>();
initJobCountList(jobCounts, startTimeInSecs, endTimeInSecs, intervalInSecs);
for (JobExecutionAPIEntity jobDuration: jobDurations) {
- countJob(jobCounts, jobDuration.getStartTime() / 1000, jobDuration.getEndTime() / 1000, intervalInSecs, jobDuration.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+ String jobType = jobDuration.getTags().get(MRJobTagName.JOB_TYPE.toString());
+ jobTypes.add(jobType);
+ countJob(jobCounts, jobDuration.getStartTime() / 1000, jobDuration.getEndTime() / 1000, intervalInSecs, jobType);
+ }
+ for (org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity job : runningJobs) {
+ if (job.getInternalState() != null && !job.getInternalState().equalsIgnoreCase(Constants.JobState.FINISHED.toString())) {
+ String jobType = job.getTags().get(MRJobTagName.JOB_TYPE.toString());
+ jobTypes.add(jobType);
+ countJob(jobCounts, job.getStartTime() / 1000, endTimeInSecs, intervalInSecs, jobType);
+ }
}
+ JobCountResponse response = new JobCountResponse();
response.jobCounts = jobCounts;
+ response.jobTypes = jobTypes;
return response;
}
public JobCountResponse getHistoryJobCount(List<JobExecutionAPIEntity> jobDurations, String timeList) {
JobCountResponse response = new JobCountResponse();
List<UnitJobCount> jobCounts = new ArrayList<>();
+ Set<String> jobTypes = new HashSet<>();
List<Long> times = TaskCountByDurationHelper.parseTimeList(timeList);
for (int i = 0; i < times.size(); i++) {
jobCounts.add(new UnitJobCount(times.get(i)));
@@ -66,9 +82,12 @@ public class MRJobCountHelper {
for (JobExecutionAPIEntity job : jobDurations) {
int jobIndex = TaskCountByDurationHelper.getPosition(times, job.getDurationTime());
UnitJobCount counter = jobCounts.get(jobIndex);
- countJob(counter, job.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+ String jobType = job.getTags().get(MRJobTagName.JOB_TYPE.toString());
+ jobTypes.add(jobType);
+ countJob(counter, jobType);
}
response.jobCounts = jobCounts;
+ response.jobTypes = jobTypes;
return response;
}
@@ -85,7 +104,7 @@ public class MRJobCountHelper {
}
public void countJob(List<UnitJobCount> jobCounts, long jobStartTimeSecs, long jobEndTimeSecs, long intervalInSecs, String jobType) {
- long startCountPoint = jobCounts.get(0).timeBucket;
+ long startCountPoint = jobCounts.get(0).timeBucket / DateTimeUtil.ONESECOND;
if (jobEndTimeSecs < startCountPoint) {
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fce6ae3f/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
index 5af9811..e6041f2 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
@@ -276,7 +276,7 @@ public class MRJobExecutionResource {
String searchEndTime = endTime;
try {
startTimeInMills = DateTimeUtil.humanDateToSeconds(startTime) * DateTimeUtil.ONESECOND;
- searchStartTime = helper.moveTimeforwardOneDay(searchStartTime);
+ searchStartTime = helper.moveTimeForwardOneDay(searchStartTime);
} catch (Exception e) {
response.errMessage = e.getMessage();
return response;
@@ -285,15 +285,23 @@ public class MRJobExecutionResource {
GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
if (!historyRes.isSuccess() || historyRes.getObj() == null) {
- response.errMessage = String.format("Catch an exception: %s with query=%s", historyRes.getException(), query);
+ response.errMessage = String.format("Catch an exception during fetch history jobs: %s with query=%s", historyRes.getException(), query);
+ return response;
+ }
+ query = String.format("%s[@site=\"%s\"]{@startTime,@endTime,@jobType}", Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site);
+ GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity> runningRes =
+ resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+ if (!runningRes.isSuccess() || runningRes.getObj() == null) {
+ response.errMessage = String.format("Catch an exception during fetch running jobs: %s with query=%s", runningRes.getException(), query);
return response;
}
try {
long startTimeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
long endTimeInSecs = DateTimeUtil.humanDateToSeconds(endTime);
- return helper.getRunningJobCount(historyRes.getObj(), startTimeInSecs, endTimeInSecs, intervalInSecs);
+ return helper.getRunningJobCount(historyRes.getObj(), runningRes.getObj(), startTimeInSecs, endTimeInSecs, intervalInSecs);
} catch (Exception e) {
+ e.printStackTrace();
response.errMessage = e.getMessage();
return response;
}
@@ -344,7 +352,7 @@ public class MRJobExecutionResource {
String searchEndTime = timePoint;
try {
timePointsInMills = DateTimeUtil.humanDateToSeconds(timePoint) * DateTimeUtil.ONESECOND;
- searchStartTime = helper.moveTimeforwardOneDay(searchStartTime);
+ searchStartTime = helper.moveTimeForwardOneDay(searchStartTime);
} catch (ParseException e) {
response.setException(e);
response.setSuccess(false);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fce6ae3f/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
index c546198..170533c 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
@@ -30,6 +30,7 @@ public class MRJobTaskCountResponse {
}
public static class JobCountResponse extends MRJobTaskCountResponse {
+ public Set<String> jobTypes;
public List<UnitJobCount> jobCounts;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fce6ae3f/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
index 718f068..c8d8869 100644
--- a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
+++ b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
@@ -40,8 +40,11 @@ public class TestJobCountPerBucketHelper {
String timeString2 = DateTimeUtil.secondsToHumanDate(timestamp);
Assert.assertTrue(timeString2.equals(timeString));
- String timeString3 = helper.moveTimeforwardOneDay(timeString);
+ String timeString3 = helper.moveTimeForwardOneDay(timeString);
Assert.assertTrue(timeString3.equals("2016-08-21 20:13:00"));
+
+ String timeString4 = helper.moveTimeForwardOneDay(timeString3);
+ Assert.assertTrue(timeString4.equals("2016-08-20 20:13:00"));
}
@Test
@@ -53,7 +56,7 @@ public class TestJobCountPerBucketHelper {
/*for (MRJobTaskCountResponse.UnitJobCount jobCount : jobCounts) {
LOG.info(DateTimeUtil.secondsToHumanDate(jobCount.timeBucket));
}*/
- Assert.assertTrue(DateTimeUtil.secondsToHumanDate(jobCounts.get(1).timeBucket).equals("2016-08-22 20:15:00"));
+ Assert.assertTrue(DateTimeUtil.millisecondsToHumanDateWithSeconds(jobCounts.get(1).timeBucket).equals("2016-08-22 20:15:00"));
}
@Test