You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/08/31 06:21:34 UTC
incubator-eagle git commit: [EAGLE-514] Add two job count apis
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 4aa5b4526 -> a66f64cf9
[EAGLE-514] Add two job count apis
https://issues.apache.org/jira/browse/EAGLE-514
1. adding two job counting apis
2. add tracking url in running/history job execution entity
3. unify the status presentation for job/task execution entity
4. unify the name of the common fields between running job entity & history job entity
Author: Qingwen Zhao <qi...@gmail.com>
Closes #408 from qingwen220/jobStats.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a66f64cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a66f64cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a66f64cf
Branch: refs/heads/develop
Commit: a66f64cf9f4212f4923f0e6ea6c7270449aa2ce4
Parents: 4aa5b45
Author: Qingwen Zhao <qi...@gmail.com>
Authored: Wed Aug 31 14:21:22 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Wed Aug 31 14:21:22 2016 +0800
----------------------------------------------------------------------
.../mr/historyentity/JobExecutionAPIEntity.java | 11 +
.../mr/runningentity/JobExecutionAPIEntity.java | 36 ++-
.../jpm/mr/history/MRHistoryJobConfig.java | 2 +
.../history/crawler/JHFCrawlerDriverImpl.java | 2 +-
.../jpm/mr/history/parser/EagleJobStatus.java | 2 +-
.../mr/history/parser/JHFEventReaderBase.java | 21 +-
.../mr/history/parser/JHFMRVer1EventReader.java | 5 +-
.../mr/history/parser/JHFMRVer2EventReader.java | 15 +-
.../jpm/mr/history/parser/JHFParserFactory.java | 4 +-
.../JobEntityCreationEagleServiceListener.java | 6 +-
.../src/main/resources/application.conf | 1 +
.../jpm/mr/running/parser/MRJobParser.java | 5 +-
.../eagle/service/jpm/MRJobCountHelper.java | 121 ++++++++
.../service/jpm/MRJobExecutionResource.java | 286 +++++++++++++------
.../service/jpm/MRJobTaskCountResponse.java | 65 +++++
.../service/jpm/MRJobTaskGroupResponse.java | 41 ---
.../service/jpm/TaskCountByDurationHelper.java | 106 +++++++
.../jpm/TestJobCountPerBucketHelper.java | 87 ++++++
.../service/jpm/TestMRJobExecutionResource.java | 99 -------
.../service/jpm/TestTaskCountPerJobHelper.java | 96 +++++++
.../org/apache/eagle/jpm/util/Constants.java | 2 +
21 files changed, 752 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
index 97e77b2..cdc5810 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobExecutionAPIEntity.java
@@ -89,6 +89,17 @@ public class JobExecutionAPIEntity extends JobBaseAPIEntity {
private int totalReduceAttempts;
@Column("ac")
private int failedReduceAttempts;
+ @Column("ad")
+ private String trackingUrl;
+
+ public String getTrackingUrl() {
+ return trackingUrl;
+ }
+
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ valueChanged("trackingUrl");
+ }
public long getDurationTime() {
return durationTime;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
index dd81eb4..245fc0f 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/runningentity/JobExecutionAPIEntity.java
@@ -49,11 +49,11 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
@Column("e")
private int numTotalMaps;
@Column("f")
- private int mapsCompleted;
+ private int numFinishedMaps;
@Column("g")
private int numTotalReduces;
@Column("h")
- private int reducesCompleted;
+ private int numFinishedReduces;
@Column("i")
private double mapProgress;
@Column("j")
@@ -112,6 +112,18 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
private long submissionTime;
@Column("ak")
private String internalState;
+ @Column("al")
+ private String trackingUrl;
+
+ public String getTrackingUrl() {
+ return trackingUrl;
+ }
+
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ valueChanged("trackingUrl");
+ }
+
public JobConfig getJobConfig() {
return jobConfig;
@@ -176,13 +188,13 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
valueChanged("numTotalMaps");
}
- public int getMapsCompleted() {
- return mapsCompleted;
+ public int getNumFinishedMaps() {
+ return numFinishedMaps;
}
- public void setMapsCompleted(int mapsCompleted) {
- this.mapsCompleted = mapsCompleted;
- valueChanged("mapsCompleted");
+ public void setNumFinishedMaps(int numFinishedMaps) {
+ this.numFinishedMaps = numFinishedMaps;
+ valueChanged("numFinishedMaps");
}
public int getNumTotalReduces() {
@@ -194,13 +206,13 @@ public class JobExecutionAPIEntity extends TaggedLogAPIEntity {
valueChanged("numTotalReduces");
}
- public int getReducesCompleted() {
- return reducesCompleted;
+ public int getNumFinishedReduces() {
+ return numFinishedReduces;
}
- public void setReducesCompleted(int reducesCompleted) {
- this.reducesCompleted = reducesCompleted;
- valueChanged("reducesCompleted");
+ public void setNumFinishedReduces(int numFinishedReduces) {
+ this.numFinishedReduces = numFinishedReduces;
+ valueChanged("numFinishedReduces");
}
public double getMapProgress() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
index ae86904..c0943de 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -86,6 +86,7 @@ public class MRHistoryJobConfig implements Serializable {
public static class JobHistoryEndpointConfig implements Serializable {
public String nnEndpoint;
+ public String mrHistoryServerUrl;
public String basePath;
public boolean pathContainsJobTrackerName;
public String jobTrackerName;
@@ -173,6 +174,7 @@ public class MRHistoryJobConfig implements Serializable {
this.jobHistoryEndpointConfig.basePath = config.getString("dataSourceConfig.basePath");
this.jobHistoryEndpointConfig.jobTrackerName = config.getString("dataSourceConfig.jobTrackerName");
this.jobHistoryEndpointConfig.nnEndpoint = config.getString("dataSourceConfig.nnEndpoint");
+ this.jobHistoryEndpointConfig.mrHistoryServerUrl = config.getString("dataSourceConfig.mrHistoryServerUrl");
this.jobHistoryEndpointConfig.pathContainsJobTrackerName = config.getBoolean("dataSourceConfig.pathContainsJobTrackerName");
this.jobHistoryEndpointConfig.principal = config.getString("dataSourceConfig.principal");
this.jobHistoryEndpointConfig.keyTab = config.getString("dataSourceConfig.keytab");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index e16ecce..1a17751 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -247,7 +247,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
JobCountEntity entity = new JobCountEntity();
entity.setTotal(jobs.size());
entity.setFail(0);
- jobs.stream().filter(job -> !job.getRight().equals(EagleJobStatus.SUCCESS.toString())).forEach(
+ jobs.stream().filter(job -> !job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())).forEach(
job -> entity.setFail(1 + entity.getFail())
);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
index fb218e3..24fa097 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/EagleJobStatus.java
@@ -23,7 +23,7 @@ public enum EagleJobStatus {
LAUNCHED,
PREP,
RUNNING,
- SUCCESS,
+ SUCCEEDED,
KILLED,
FAILED;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index 6916aad..1570956 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -18,6 +18,8 @@
package org.apache.eagle.jpm.mr.history.parser;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.eagle.jpm.mr.historyentity.*;
@@ -32,6 +34,8 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -65,6 +69,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
protected String queueName;
protected Long jobLaunchTime;
protected JobHistoryContentFilter filter;
+ private JobHistoryEndpointConfig jobHistoryEndpointConfig;
protected final List<HistoryJobEntityLifecycleListener> jobEntityLifecycleListeners = new ArrayList<>();
@@ -96,8 +101,9 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
*
* @param baseTags
*/
- public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
+ public JHFEventReaderBase(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) {
this.filter = filter;
+ this.jobHistoryEndpointConfig = jobHistoryEndpointConfig;
this.baseTags = baseTags;
jobSubmitEventEntity = new JobEventAPIEntity();
@@ -155,6 +161,18 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
}
}
+ private String buildJobTrackingUrl(String jobId) {
+ String jobTrackingUrlBase = this.jobHistoryEndpointConfig.mrHistoryServerUrl + "/jobhistory/job/";
+ try {
+ URI oldUri = new URI(jobTrackingUrlBase);
+ URI resolved = oldUri.resolve(jobId);
+ return resolved.toString();
+ } catch (URISyntaxException e) {
+ LOG.warn("Tracking url build failed with baseURL=%s, resolvePart=%s", jobTrackingUrlBase, jobId);
+ return jobTrackingUrlBase;
+ }
+ }
+
/**
* ...
* @param id
@@ -236,6 +254,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
jobExecutionEntity.getTags().put(MRJobTagName.JOB_QUEUE.toString(), queueName);
jobExecutionEntity.getTags().put(MRJobTagName.JOB_TYPE.toString(), this.jobType);
+ jobExecutionEntity.setTrackingUrl(buildJobTrackingUrl(jobId));
jobExecutionEntity.setCurrentState(values.get(Keys.JOB_STATUS));
jobExecutionEntity.setStartTime(jobLaunchEventEntity.getTimestamp());
jobExecutionEntity.setEndTime(jobFinishEventEntity.getTimestamp());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
index e20836f..0e9458a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer1EventReader.java
@@ -18,6 +18,7 @@
package org.apache.eagle.jpm.mr.history.parser;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
@@ -48,8 +49,8 @@ public class JHFMRVer1EventReader extends JHFEventReaderBase implements JHFMRVer
*
* @param baseTags
*/
- public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
- super(baseTags, configuration, filter);
+ public JHFMRVer1EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) {
+ super(baseTags, configuration, filter, jobHistoryEndpointConfig);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index f21fd41..74f84f6 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -18,6 +18,7 @@
package org.apache.eagle.jpm.mr.history.parser;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig.JobHistoryEndpointConfig;
import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
import org.apache.eagle.jpm.mr.history.parser.JHFMRVer1Parser.Keys;
import org.apache.eagle.jpm.util.jobcounter.JobCounters;
@@ -43,8 +44,8 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
*
* @throws IOException
*/
- public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter) {
- super(baseTags, configuration, filter);
+ public JHFMRVer2EventReader(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, JobHistoryEndpointConfig jobHistoryEndpointConfig) {
+ super(baseTags, configuration, filter, jobHistoryEndpointConfig);
}
@SuppressWarnings("deprecation")
@@ -233,7 +234,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
if (js.getFailedReduces() != null) {
values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
}
- values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCESS.name());
+ values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCEEDED.name());
handleJob(wrapper.getType(), values, js.getTotalCounters());
}
@@ -289,7 +290,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
}
if (js.getStatus() != null) {
- values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+ values.put(Keys.TASK_STATUS, js.getStatus().toString());
}
handleTask(RecordTypes.Task, wrapper.getType(), values, js.getCounters());
}
@@ -308,7 +309,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
values.put(Keys.FINISH_TIME, js.getFinishTime().toString());
}
if (js.getStatus() != null) {
- values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getStatus().toString()));
+ values.put(Keys.TASK_STATUS, js.getStatus().toString());
}
if (js.getError() != null) {
values.put(Keys.ERROR, js.getError().toString());
@@ -381,7 +382,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
values.put(Keys.TASK_TYPE, js.getTaskType().toString());
}
if (js.getTaskStatus() != null) {
- values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+ values.put(Keys.TASK_STATUS, js.getTaskStatus().toString());
}
if (js.getAttemptId() != null) {
values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
@@ -419,7 +420,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
values.put(Keys.TASK_TYPE, js.getTaskType().toString());
}
if (js.getTaskStatus() != null) {
- values.put(Keys.TASK_STATUS, normalizeTaskStatus(js.getTaskStatus().toString()));
+ values.put(Keys.TASK_STATUS, js.getTaskStatus().toString());
}
if (js.getAttemptId() != null) {
values.put(Keys.TASK_ATTEMPT_ID, js.getAttemptId().toString());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index 718612d..386d50c 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -46,7 +46,7 @@ public class JHFParserFactory {
switch (f) {
case MRVer2:
- JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter);
+ JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig());
reader2.addListener(new JobEntityCreationEagleServiceListener(configManager));
reader2.addListener(new TaskFailureListener(configManager));
reader2.addListener(new TaskAttemptCounterListener(configManager));
@@ -57,7 +57,7 @@ public class JHFParserFactory {
break;
case MRVer1:
default:
- JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter);
+ JHFMRVer1EventReader reader1 = new JHFMRVer1EventReader(baseTags, configuration, filter, configManager.getJobHistoryEndpointConfig());
reader1.addListener(new JobEntityCreationEagleServiceListener(configManager));
reader1.addListener(new TaskFailureListener(configManager));
reader1.addListener(new TaskAttemptCounterListener(configManager));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index a681aca..520fbbc 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -43,7 +43,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
List<JobEventAPIEntity> jobEvents = new ArrayList<>();
List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
- private JobHistoryZKStateManager zkState;
private TimeZone timeZone;
public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) {
@@ -56,7 +55,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided");
}
this.batchSize = batchSize;
- zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig());
timeZone = TimeZone.getTimeZone(configManager.getControlConfig().timeZone);
}
@@ -92,12 +90,13 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
eagleServiceConfig.password);
client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
+ JobHistoryZKStateManager zkState = new JobHistoryZKStateManager(configManager.getZkStateConfig());
logger.info("start flushing entities of total number " + list.size());
for (int i = 0; i < list.size(); i++) {
JobBaseAPIEntity entity = list.get(i);
if (entity instanceof JobExecutionAPIEntity) {
jobs.add((JobExecutionAPIEntity) entity);
- this.zkState.updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
+ zkState.updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
entity.getTags().get(MRJobTagName.JOB_ID.toString()),
((JobExecutionAPIEntity) entity).getCurrentState());
} else if (entity instanceof JobEventAPIEntity) {
@@ -108,6 +107,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
taskAttemptExecs.add((TaskAttemptExecutionAPIEntity) entity);
}
}
+ zkState.close();
GenericServiceAPIResponseEntity result;
if (jobs.size() > 0) {
logger.info("flush JobExecutionAPIEntity of number " + jobs.size());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index db2c716..de874a6 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -40,6 +40,7 @@
"zkRetryTimes" : 3,
"zkRetryInterval" : 20000,
"nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
+ "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888",
"principal":"", #if not need, then empty
"keytab":"",
"basePath" : "/mr-history/done",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 9e156fa..5811f72 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -207,14 +207,15 @@ public class MRJobParser implements Runnable {
}
jobExecutionAPIEntity.setTimestamp(app.getStartedTime());
jobExecutionAPIEntity.setSubmissionTime(app.getStartedTime());
+ jobExecutionAPIEntity.setTrackingUrl(app.getTrackingUrl());
jobExecutionAPIEntity.setStartTime(mrJob.getStartTime());
jobExecutionAPIEntity.setDurationTime(mrJob.getElapsedTime());
jobExecutionAPIEntity.setCurrentState(mrJob.getState());
jobExecutionAPIEntity.setInternalState(mrJob.getState());
jobExecutionAPIEntity.setNumTotalMaps(mrJob.getMapsTotal());
- jobExecutionAPIEntity.setMapsCompleted(mrJob.getMapsCompleted());
+ jobExecutionAPIEntity.setNumFinishedMaps(mrJob.getMapsCompleted());
jobExecutionAPIEntity.setNumTotalReduces(mrJob.getReducesTotal());
- jobExecutionAPIEntity.setReducesCompleted(mrJob.getReducesCompleted());
+ jobExecutionAPIEntity.setNumFinishedReduces(mrJob.getReducesCompleted());
jobExecutionAPIEntity.setMapProgress(mrJob.getMapProgress());
jobExecutionAPIEntity.setReduceProgress(mrJob.getReduceProgress());
jobExecutionAPIEntity.setMapsPending(mrJob.getMapsPending());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
new file mode 100644
index 0000000..93c6c00
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobCountHelper.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.UnitJobCount;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MRJobCountHelper {
+
+ public void initJobCountList(List<UnitJobCount> jobCounts, long startTime, long endTime, long intervalInSecs) {
+ for (long i = startTime / intervalInSecs; i * intervalInSecs <= endTime; i++) {
+ jobCounts.add(new UnitJobCount(i * intervalInSecs));
+ }
+ }
+
+ public String moveTimeforwardOneDay(String startTime) throws ParseException {
+ long timeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
+ timeInSecs -= 24L * 60L * 60L;
+ return DateTimeUtil.secondsToHumanDate(timeInSecs);
+ }
+
+ public JobCountResponse getRunningJobCount(List<JobExecutionAPIEntity> jobDurations,
+ long startTimeInSecs,
+ long endTimeInSecs,
+ long intervalInSecs) {
+ JobCountResponse response = new JobCountResponse();
+ List<UnitJobCount> jobCounts = new ArrayList<>();
+ initJobCountList(jobCounts, startTimeInSecs, endTimeInSecs, intervalInSecs);
+ for (JobExecutionAPIEntity jobDuration: jobDurations) {
+ countJob(jobCounts, jobDuration.getStartTime() / 1000, jobDuration.getEndTime() / 1000, intervalInSecs, jobDuration.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+ }
+ response.jobCounts = jobCounts;
+ return response;
+ }
+
+ public JobCountResponse getHistoryJobCount(List<JobExecutionAPIEntity> jobDurations, String timeList) {
+ JobCountResponse response = new JobCountResponse();
+ List<UnitJobCount> jobCounts = new ArrayList<>();
+ List<Long> times = TaskCountByDurationHelper.parseTimeList(timeList);
+ for (int i = 0; i < times.size(); i++) {
+ jobCounts.add(new UnitJobCount(times.get(i)));
+ }
+ for (JobExecutionAPIEntity job : jobDurations) {
+ int jobIndex = TaskCountByDurationHelper.getPosition(times, job.getDurationTime());
+ UnitJobCount counter = jobCounts.get(jobIndex);
+ countJob(counter, job.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+ }
+ response.jobCounts = jobCounts;
+ return response;
+ }
+
+ public void countJob(UnitJobCount counter, String jobType) {
+ if (null == jobType) {
+ jobType = "null";
+ }
+ counter.jobCount++;
+ if (counter.jobCountByType.containsKey(jobType)) {
+ counter.jobCountByType.put(jobType, counter.jobCountByType.get(jobType) + 1);
+ } else {
+ counter.jobCountByType.put(jobType, 1L);
+ }
+ }
+
+ public void countJob(List<UnitJobCount> jobCounts, long jobStartTimeSecs, long jobEndTimeSecs, long intervalInSecs, String jobType) {
+ long startCountPoint = jobCounts.get(0).timeBucket;
+ if (jobEndTimeSecs < startCountPoint) {
+ return;
+ }
+ int startIndex = 0;
+ if (jobStartTimeSecs > startCountPoint) {
+ long relativeStartTime = jobStartTimeSecs - startCountPoint;
+ startIndex = (int) (relativeStartTime / intervalInSecs) + (relativeStartTime % intervalInSecs == 0 ? 0 : 1);
+ }
+ long relativeEndTime = jobEndTimeSecs - startCountPoint;
+ int endIndex = (int) (relativeEndTime / intervalInSecs);
+
+ for (int i = startIndex; i <= endIndex && i < jobCounts.size(); i++) {
+ countJob(jobCounts.get(i), jobType);
+ }
+ }
+
+ public List<String> getSearchTimeDuration(List<JobExecutionAPIEntity> jobEntities) {
+ List<String> pair = new ArrayList<>();
+ long minStartTime = System.currentTimeMillis();
+ long maxEndTime = 0;
+ for (JobExecutionAPIEntity jobEntity : jobEntities) {
+ if (minStartTime > jobEntity.getStartTime()) {
+ minStartTime = jobEntity.getStartTime();
+ }
+ if (maxEndTime < jobEntity.getEndTime()) {
+ maxEndTime = jobEntity.getEndTime();
+ }
+ }
+ pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(minStartTime));
+ pair.add(DateTimeUtil.millisecondsToHumanDateWithSeconds(maxEndTime));
+ return pair;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
index 3e487ae..5af9811 100644
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobExecutionResource.java
@@ -18,19 +18,25 @@
package org.apache.eagle.service.jpm;
-
import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID;
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_TYPE;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.generic.GenericEntityServiceResource;
+import org.apache.eagle.service.generic.ListQueryResource;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.JobCountResponse;
+import org.apache.eagle.service.jpm.MRJobTaskCountResponse.TaskCountPerJobResponse;
import org.apache.commons.lang.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.text.ParseException;
import java.util.*;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
@@ -59,14 +65,14 @@ public class MRJobExecutionResource {
List<TaggedLogAPIEntity> jobs = new ArrayList<>();
List<TaggedLogAPIEntity> finishedJobs = new ArrayList<>();
Set<String> jobIds = new HashSet<>();
- final Map<String,Object> meta = new HashMap<>();
+ final Map<String, Object> meta = new HashMap<>();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String jobQuery = String.format(query, Constants.JPA_JOB_EXECUTION_SERVICE_NAME);
GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res =
- resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin,
- top,filterIfMissing, parallel, metricName, verbose);
+ resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin,
+ top, filterIfMissing, parallel, metricName, verbose);
if (res.isSuccess() && res.getObj() != null) {
for (TaggedLogAPIEntity o : res.getObj()) {
finishedJobs.add(o);
@@ -74,10 +80,10 @@ public class MRJobExecutionResource {
}
jobQuery = String.format(query, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME);
res = resource.search(jobQuery, startTime, endTime, pageSize, startRowkey, treeAgg, timeSeries, intervalmin,
- top,filterIfMissing, parallel, metricName, verbose);
+ top, filterIfMissing, parallel, metricName, verbose);
if (res.isSuccess() && res.getObj() != null) {
for (TaggedLogAPIEntity o : res.getObj()) {
- if (! isDuplicate(jobIds, o)) {
+ if (!isDuplicate(jobIds, o)) {
jobs.add(o);
}
}
@@ -92,7 +98,7 @@ public class MRJobExecutionResource {
response.setException(new Exception(res.getException()));
}
meta.put(TOTAL_RESULTS, jobs.size());
- meta.put(ELAPSEDMS,stopWatch.getTime());
+ meta.put(ELAPSEDMS, stopWatch.getTime());
response.setObj(jobs);
response.setMeta(meta);
return response;
@@ -107,7 +113,7 @@ public class MRJobExecutionResource {
}
private String buildCondition(String jobId, String jobDefId, String site) {
- String conditionFormat = "@site=\"%s\"" ;
+ String conditionFormat = "@site=\"%s\"";
String condition = null;
if (jobDefId != null) {
conditionFormat = conditionFormat + " AND @jobDefId=\"%s\"";
@@ -144,12 +150,12 @@ public class MRJobExecutionResource {
}
LOG.debug("search condition=" + condition);
- final Map<String,Object> meta = new HashMap<>();
+ final Map<String, Object> meta = new HashMap<>();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String queryFormat = "%s[%s]{*}";
String queryString = String.format(queryFormat, Constants.JPA_JOB_EXECUTION_SERVICE_NAME, condition);
- GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false);
+ GenericServiceAPIResponseEntity<TaggedLogAPIEntity> res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false);
if (res.isSuccess() && res.getObj() != null) {
for (TaggedLogAPIEntity o : res.getObj()) {
jobs.add(o);
@@ -157,10 +163,10 @@ public class MRJobExecutionResource {
}
}
queryString = String.format(queryFormat, Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, condition);
- res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false);
+ res = resource.search(queryString, null, null, pageSize, null, false, true, 0L, 0, true, 0, null, false);
if (res.isSuccess() && res.getObj() != null) {
for (TaggedLogAPIEntity o : res.getObj()) {
- if (! isDuplicate(jobIds, o)) {
+ if (!isDuplicate(jobIds, o)) {
jobs.add(o);
}
}
@@ -181,128 +187,228 @@ public class MRJobExecutionResource {
response.setException(new Exception(res.getException()));
}
meta.put(TOTAL_RESULTS, jobs.size());
- meta.put(ELAPSEDMS,stopWatch.getTime());
+ meta.put(ELAPSEDMS, stopWatch.getTime());
response.setObj(jobs);
response.setMeta(meta);
return response;
}
- public List<Long> parseTimeList(String timelist) {
- List<Long> times = new ArrayList<>();
- String [] strs = timelist.split("[,\\s]");
- for (String str : strs) {
- try {
- times.add(Long.parseLong(str));
- } catch (Exception ex) {
- LOG.warn(str + " is not a number");
- }
- }
- return times;
- }
- public int getPosition(List<Long> times, Long duration) {
- duration = duration / 1000;
- for (int i = 1; i < times.size(); i++) {
- if (duration < times.get(i)) {
- return i - 1;
- }
- }
- return times.size() - 1;
- }
- public void getTopTasks(List<MRJobTaskGroupResponse.UnitTaskCount> list, long top) {
- for (MRJobTaskGroupResponse.UnitTaskCount taskCounter : list) {
- Iterator<TaskExecutionAPIEntity> iterator = taskCounter.entities.iterator();
- for (int i = 0; i < top && iterator.hasNext(); i++) {
- taskCounter.topEntities.add(iterator.next());
- }
- taskCounter.entities.clear();
- }
- }
-
- public void initTaskCountList(List<MRJobTaskGroupResponse.UnitTaskCount> runningTaskCount,
- List<MRJobTaskGroupResponse.UnitTaskCount> finishedTaskCount,
- List<Long> times,
- Comparator comparator) {
- for (int i = 0; i < times.size(); i++) {
- runningTaskCount.add(new MRJobTaskGroupResponse.UnitTaskCount(times.get(i), comparator));
- finishedTaskCount.add(new MRJobTaskGroupResponse.UnitTaskCount(times.get(i), comparator));
- }
- }
@GET
- @Path("{jobId}/taskCounts")
+ @Path("{jobId}/taskCountsByDuration")
@Produces(MediaType.APPLICATION_JSON)
- public MRJobTaskGroupResponse getTaskCounts(@PathParam("jobId") String jobId,
- @QueryParam("site") String site,
- @QueryParam("timelineInSecs") String timeList,
- @QueryParam("top") long top) {
- MRJobTaskGroupResponse response = new MRJobTaskGroupResponse();
+ public TaskCountPerJobResponse getTaskCountsPerJob(@PathParam("jobId") String jobId,
+ @QueryParam("site") String site,
+ @QueryParam("timelineInSecs") String timeList,
+ @QueryParam("top") long top) {
+ TaskCountPerJobResponse response = new TaskCountPerJobResponse();
if (jobId == null || site == null || timeList == null || timeList.isEmpty()) {
response.errMessage = "IllegalArgumentException: jobId == null || site == null || timelineInSecs == null or isEmpty";
return response;
}
- List<MRJobTaskGroupResponse.UnitTaskCount> runningTaskCount = new ArrayList<>();
- List<MRJobTaskGroupResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>();
+ TaskCountByDurationHelper helper = new TaskCountByDurationHelper();
+ List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new ArrayList<>();
+ List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>();
- List<Long> times = parseTimeList(timeList);
+ List<Long> times = helper.parseTimeList(timeList);
String query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_TASK_EXECUTION_SERVICE_NAME, site, jobId);
GenericServiceAPIResponseEntity<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> historyRes =
- resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+ resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
if (historyRes.isSuccess() && historyRes.getObj() != null && historyRes.getObj().size() > 0) {
- initTaskCountList(runningTaskCount, finishedTaskCount, times, new HistoryTaskComparator());
+ helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.HistoryTaskComparator());
for (org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o : historyRes.getObj()) {
- int index = getPosition(times, o.getDuration());
- MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index);
- counter.taskCount++;
+ int index = helper.getPosition(times, o.getDuration());
+ MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index);
+ helper.countTask(counter, o.getTags().get(TASK_TYPE.toString()));
counter.entities.add(o);
}
} else {
query = String.format("%s[@site=\"%s\" AND @jobId=\"%s\"]{*}", Constants.JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME, site, jobId);
GenericServiceAPIResponseEntity<TaskExecutionAPIEntity> runningRes =
- resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+ resource.search(query, null, null, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
if (runningRes.isSuccess() && runningRes.getObj() != null) {
- initTaskCountList(runningTaskCount, finishedTaskCount, times, new RunningTaskComparator());
+ helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.RunningTaskComparator());
for (TaskExecutionAPIEntity o : runningRes.getObj()) {
- int index = getPosition(times, o.getDuration());
+ int index = helper.getPosition(times, o.getDuration());
if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
- MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index);
- counter.taskCount++;
+ MRJobTaskCountResponse.UnitTaskCount counter = runningTaskCount.get(index);
+ helper.countTask(counter, o.getTags().get(TASK_TYPE.toString()));
counter.entities.add(o);
} else if (o.getEndTime() != 0) {
- MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index);
- counter.taskCount++;
+ MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index);
+ helper.countTask(counter, o.getTags().get(TASK_TYPE.toString()));
counter.entities.add(o);
}
}
}
}
- if (top > 0) {
- getTopTasks(runningTaskCount, top);
+ if (top > 0) {
+ helper.getTopTasks(runningTaskCount, top);
response.runningTaskCount = runningTaskCount;
- getTopTasks(finishedTaskCount, top);
+ helper.getTopTasks(finishedTaskCount, top);
response.finishedTaskCount = finishedTaskCount;
}
+ response.topNumber = top;
return response;
}
- static class RunningTaskComparator implements Comparator<TaskExecutionAPIEntity> {
- @Override
- public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity o2) {
- Long time1 = o1.getDuration();
- Long time2 = o2.getDuration();
- return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
+ @GET
+ @Path("runningJobCounts")
+ @Produces(MediaType.APPLICATION_JSON)
+ public JobCountResponse getRunningJobCount(@QueryParam("site") String site,
+ @QueryParam("durationBegin") String startTime,
+ @QueryParam("durationEnd") String endTime,
+ @QueryParam("intervalInSecs") long intervalInSecs) {
+ JobCountResponse response = new JobCountResponse();
+ MRJobCountHelper helper = new MRJobCountHelper();
+ if (site == null || startTime == null || endTime == null) {
+ response.errMessage = "IllegalArgument: site, durationBegin, durationEnd is null";
+ return response;
+ }
+ if (intervalInSecs <= 0) {
+ response.errMessage = String.format("IllegalArgument: intervalInSecs=%s is invalid", intervalInSecs);
+ return response;
+ }
+ long startTimeInMills;
+ String searchStartTime = startTime;
+ String searchEndTime = endTime;
+ try {
+ startTimeInMills = DateTimeUtil.humanDateToSeconds(startTime) * DateTimeUtil.ONESECOND;
+ searchStartTime = helper.moveTimeforwardOneDay(searchStartTime);
+ } catch (Exception e) {
+ response.errMessage = e.getMessage();
+ return response;
+ }
+ String query = String.format("%s[@site=\"%s\" AND @endTime>=%s]{@startTime,@endTime,@jobType}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, startTimeInMills);
+ GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
+ resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+ if (!historyRes.isSuccess() || historyRes.getObj() == null) {
+ response.errMessage = String.format("Catch an exception: %s with query=%s", historyRes.getException(), query);
+ return response;
}
+
+ try {
+ long startTimeInSecs = DateTimeUtil.humanDateToSeconds(startTime);
+ long endTimeInSecs = DateTimeUtil.humanDateToSeconds(endTime);
+ return helper.getRunningJobCount(historyRes.getObj(), startTimeInSecs, endTimeInSecs, intervalInSecs);
+ } catch (Exception e) {
+ response.errMessage = e.getMessage();
+ return response;
+ }
+ }
+
+ @GET
+ @Path("jobMetrics/entities")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Object getJobMetricsByEntitiesQuery(@QueryParam("site") String site,
+ @QueryParam("timePoint") String timePoint,
+ @QueryParam("metricName") String metricName,
+ @QueryParam("intervalmin") long intervalmin,
+ @QueryParam("top") int top) {
+ return getJobMetrics(site, timePoint, metricName, intervalmin, top, queryMetricEntitiesFunc);
}
- static class HistoryTaskComparator implements Comparator<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> {
- @Override
- public int compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1,
- org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) {
- Long time1 = o1.getDuration();
- Long time2 = o2.getDuration();
- return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
+ @GET
+ @Path("jobMetrics/list")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Object getJobMetricsByListQuery(@QueryParam("site") String site,
+ @QueryParam("timePoint") String timePoint,
+ @QueryParam("metricName") String metricName,
+ @QueryParam("intervalmin") long intervalmin,
+ @QueryParam("top") int top) {
+ return getJobMetrics(site, timePoint, metricName, intervalmin, top, queryMetricListFunc);
+ }
+
+ public Object getJobMetrics(String site, String timePoint, String metricName, long intervalmin, int top,
+ Function6<String, String, String, Long, Integer, String, Object> metricQueryFunc) {
+ GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity();
+ MRJobCountHelper helper = new MRJobCountHelper();
+ if (site == null || timePoint == null || metricName == null) {
+ response.setException(new IllegalArgumentException("Error: site, timePoint, metricName may be unset"));
+ response.setSuccess(false);
+ return response;
+ }
+ if (intervalmin <= 0) {
+ LOG.warn("query parameter intervalmin <= 0, use default value 5 instead");
+ intervalmin = 5;
}
+ if (top <= 0) {
+ LOG.warn("query parameter top <= 0, use default value 10 instead");
+ top = 10;
+ }
+
+ long timePointsInMills;
+ String searchStartTime = timePoint;
+ String searchEndTime = timePoint;
+ try {
+ timePointsInMills = DateTimeUtil.humanDateToSeconds(timePoint) * DateTimeUtil.ONESECOND;
+ searchStartTime = helper.moveTimeforwardOneDay(searchStartTime);
+ } catch (ParseException e) {
+ response.setException(e);
+ response.setSuccess(false);
+ return response;
+ }
+ String query = String.format("%s[@site=\"%s\" AND @startTime<=\"%s\" AND @endTime>=\"%s\"]{@startTime,@endTime}",
+ Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site, timePointsInMills, timePointsInMills);
+ GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
+ resource.search(query, searchStartTime, searchEndTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+ if (!historyRes.isSuccess() || historyRes.getObj() == null) {
+ return historyRes;
+ }
+
+ List<String> timeDuration = helper.getSearchTimeDuration(historyRes.getObj());
+ LOG.info(String.format("new search time range: startTime=%s, endTime=%s", timeDuration.get(0), timeDuration.get(1)));
+ query = String.format("%s[@site=\"%s\"]<@jobId>{sum(value)}.{sum(value) desc}", Constants.GENERIC_METRIC_SERVICE, site);
+ return metricQueryFunc.apply(query, timeDuration.get(0), timeDuration.get(1), intervalmin, top, metricName);
}
+ Function6<String, String, String, Long, Integer, String, Object> queryMetricEntitiesFunc
+ = (query, startTime, endTime, intervalmin, top, metricName) -> {
+ GenericEntityServiceResource resource = new GenericEntityServiceResource();
+ return resource.search(query, startTime, endTime, Integer.MAX_VALUE, null,
+ false, true, intervalmin, top, true, 0, metricName, false);
+ };
+
+ Function6<String, String, String, Long, Integer, String, Object> queryMetricListFunc
+ = (query, startTime, endTime, intervalmin, top, metricName) -> {
+ ListQueryResource resource = new ListQueryResource();
+ return resource.listQuery(query, startTime, endTime, Integer.MAX_VALUE, null,
+ false, true, intervalmin, top, true, 0, metricName, false);
+ };
+
+ @FunctionalInterface
+ interface Function6<A, B, C, D, E, F, R> {
+ R apply(A a, B b, C c, D d, E e, F f);
+ }
+
+ @GET
+ @Path("jobCountsByDuration")
+ @Produces(MediaType.APPLICATION_JSON)
+ public JobCountResponse getJobCountGroupByDuration(@QueryParam("site") String site,
+ @QueryParam("timelineInSecs") String timeList,
+ @QueryParam("jobStartTimeBegin") String startTime,
+ @QueryParam("jobStartTimeEnd") String endTime) {
+ JobCountResponse response = new JobCountResponse();
+ MRJobCountHelper helper = new MRJobCountHelper();
+ if (site == null || startTime == null || endTime == null || timeList == null) {
+ response.errMessage = "IllegalArgument: site, jobStartTimeBegin, jobStartTimeEnd, or timelineInSecs is null";
+ return response;
+ }
+ String query = String.format("%s[@site=\"%s\"]{@durationTime,@jobType}", Constants.JPA_JOB_EXECUTION_SERVICE_NAME, site);
+ GenericServiceAPIResponseEntity<JobExecutionAPIEntity> historyRes =
+ resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, true, 0L, 0, true, 0, null, false);
+ if (!historyRes.isSuccess() || historyRes.getObj() == null) {
+ response.errMessage = String.format("Catch an exception: %s with query=%s", historyRes.getException(), query);
+ return response;
+ }
+ try {
+ return helper.getHistoryJobCount(historyRes.getObj(), timeList);
+ } catch (Exception e) {
+ response.errMessage = e.getMessage();
+ return response;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
new file mode 100644
index 0000000..c546198
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskCountResponse.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import java.util.*;
+
+public class MRJobTaskCountResponse {
+ public String errMessage;
+
+ public static class TaskCountPerJobResponse extends MRJobTaskCountResponse {
+ public long topNumber;
+ public List<UnitTaskCount> runningTaskCount;
+ public List<UnitTaskCount> finishedTaskCount;
+ }
+
+ public static class JobCountResponse extends MRJobTaskCountResponse {
+ public List<UnitJobCount> jobCounts;
+ }
+
+ static class UnitTaskCount {
+ public long timeBucket;
+ public int taskCount;
+ public int mapTaskCount;
+ public int reduceTaskCount;
+ public Set entities;
+ public List topEntities;
+
+ UnitTaskCount(long timeBucket, Comparator comparator) {
+ this.timeBucket = timeBucket;
+ this.taskCount = 0;
+ this.mapTaskCount = 0;
+ this.reduceTaskCount = 0;
+ entities = new TreeSet<>(comparator);
+ topEntities = new ArrayList<>();
+ }
+ }
+
+ static class UnitJobCount {
+ public long timeBucket;
+ public long jobCount;
+ public Map<String, Long> jobCountByType;
+
+ UnitJobCount(long timeBucket) {
+ this.timeBucket = timeBucket;
+ this.jobCount = 0;
+ this.jobCountByType = new HashMap<>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java
deleted file mode 100644
index 3be9b43..0000000
--- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/MRJobTaskGroupResponse.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.service.jpm;
-
-import java.util.*;
-
-class MRJobTaskGroupResponse {
- public List<UnitTaskCount> runningTaskCount;
- public List<UnitTaskCount> finishedTaskCount;
- public String errMessage;
-
- static class UnitTaskCount {
- public long timeBucket;
- public int taskCount;
- public Set entities;
- public List topEntities;
-
- UnitTaskCount(long timeBucket, Comparator comparator) {
- this.timeBucket = timeBucket;
- this.taskCount = 0;
- entities = new TreeSet<>(comparator);
- topEntities = new ArrayList<>();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
new file mode 100644
index 0000000..0eeb440
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/TaskCountByDurationHelper.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+public class TaskCountByDurationHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskCountByDurationHelper.class);
+
+ public static List<Long> parseTimeList(String timelist) {
+ List<Long> times = new ArrayList<>();
+ String [] strs = timelist.split("[,\\s]");
+ for (String str : strs) {
+ try {
+ times.add(Long.parseLong(str));
+ } catch (Exception ex) {
+ LOG.warn(str + " is not a number");
+ }
+ }
+ return times;
+ }
+
+ public static int getPosition(List<Long> times, Long duration) {
+ duration = duration / 1000;
+ for (int i = 1; i < times.size(); i++) {
+ if (duration < times.get(i)) {
+ return i - 1;
+ }
+ }
+ return times.size() - 1;
+ }
+
+ public void getTopTasks(List<MRJobTaskCountResponse.UnitTaskCount> list, long top) {
+ for (MRJobTaskCountResponse.UnitTaskCount taskCounter : list) {
+ Iterator<TaskExecutionAPIEntity> iterator = taskCounter.entities.iterator();
+ for (int i = 0; i < top && iterator.hasNext(); i++) {
+ taskCounter.topEntities.add(iterator.next());
+ }
+ taskCounter.entities.clear();
+ }
+ }
+
+ public void countTask(MRJobTaskCountResponse.UnitTaskCount counter, String taskType) {
+ counter.taskCount++;
+ if (taskType.equalsIgnoreCase(Constants.TaskType.MAP.toString())) {
+ counter.mapTaskCount++;
+ } else if (taskType.equalsIgnoreCase(Constants.TaskType.REDUCE.toString())) {
+ counter.reduceTaskCount++;
+ }
+ }
+
+ public void initTaskCountList(List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount,
+ List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount,
+ List<Long> times,
+ Comparator comparator) {
+ for (int i = 0; i < times.size(); i++) {
+ runningTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator));
+ finishedTaskCount.add(new MRJobTaskCountResponse.UnitTaskCount(times.get(i), comparator));
+ }
+ }
+
+ static class RunningTaskComparator implements Comparator<TaskExecutionAPIEntity> {
+ @Override
+ public int compare(TaskExecutionAPIEntity o1, TaskExecutionAPIEntity o2) {
+ Long time1 = o1.getDuration();
+ Long time2 = o2.getDuration();
+ return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
+ }
+ }
+
+ static class HistoryTaskComparator implements Comparator<org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity> {
+ @Override
+ public int compare(org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o1,
+ org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity o2) {
+ Long time1 = o1.getDuration();
+ Long time2 = o2.getDuration();
+ return (time1 > time2 ? -1 : (time1 == time2) ? 0 : 1);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
new file mode 100644
index 0000000..718f068
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestJobCountPerBucketHelper.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestJobCountPerBucketHelper {
+ MRJobCountHelper helper = new MRJobCountHelper();
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestJobCountPerBucketHelper.class);
+
+ @Test
+ public void test() throws ParseException {
+ String timeString = "2016-08-22 20:13:00";
+ long timestamp = DateTimeUtil.humanDateToSeconds(timeString);
+ String timeString2 = DateTimeUtil.secondsToHumanDate(timestamp);
+ Assert.assertTrue(timeString2.equals(timeString));
+
+ String timeString3 = helper.moveTimeforwardOneDay(timeString);
+ Assert.assertTrue(timeString3.equals("2016-08-21 20:13:00"));
+ }
+
+ @Test
+ public void test2() throws ParseException {
+ String startTime = "2016-08-22 20:13:00";
+ String endTime = "2016-08-22 24:13:00";
+ List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new ArrayList<>();
+ helper.initJobCountList(jobCounts, DateTimeUtil.humanDateToSeconds(startTime), DateTimeUtil.humanDateToSeconds(endTime), 15 * 60);
+ /*for (MRJobTaskCountResponse.UnitJobCount jobCount : jobCounts) {
+ LOG.info(DateTimeUtil.secondsToHumanDate(jobCount.timeBucket));
+ }*/
+ Assert.assertTrue(DateTimeUtil.secondsToHumanDate(jobCounts.get(1).timeBucket).equals("2016-08-22 20:15:00"));
+ }
+
+ @Test
+ public void test3() {
+ List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new ArrayList<>();
+ long intervalSecs = 5;
+ helper.initJobCountList(jobCounts, 3, 31, intervalSecs);
+ helper.countJob(jobCounts, 5, 10, intervalSecs, "hive");
+ helper.countJob(jobCounts, 13, 18, intervalSecs, "hive");
+ helper.countJob(jobCounts, 18, 28, intervalSecs, "hive");
+ helper.countJob(jobCounts, 25, 33, intervalSecs, "hive");
+ Assert.assertTrue(jobCounts.size() == 7);
+ Assert.assertTrue(jobCounts.get(1).jobCount == 1);
+ Assert.assertTrue(jobCounts.get(5).jobCount == 2);
+ }
+
+ @Test
+ public void test4() throws ParseException {
+ List<MRJobTaskCountResponse.UnitJobCount> jobCounts = new ArrayList<>();
+ long intervalSecs = 60 * 15;
+ String startTime = "2016-08-22 20:13:00";
+ String endTime = "2016-08-22 24:13:00";
+ helper.initJobCountList(jobCounts, DateTimeUtil.humanDateToSeconds(startTime), DateTimeUtil.humanDateToSeconds(endTime), intervalSecs);
+ helper.countJob(jobCounts,
+ DateTimeUtil.humanDateToSeconds("2016-08-22 20:23:00"),
+ DateTimeUtil.humanDateToSeconds("2016-08-22 20:30:00"),
+ intervalSecs,
+ "hive");
+ Assert.assertTrue(jobCounts.get(2).jobCount == 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java
deleted file mode 100644
index 824556b..0000000
--- a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestMRJobExecutionResource.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.service.jpm;
-
-import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
-import org.apache.eagle.jpm.util.Constants;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-
-public class TestMRJobExecutionResource {
-
- @Test
- public void test() {
- MRJobExecutionResource resource = new MRJobExecutionResource();
- String timeList = " 0, 10,20,40 ";
- List<Long> times = resource.parseTimeList(timeList);
- Assert.assertTrue(times.size() == 4);
-
- long val = 25 * 1000;
- int index = resource.getPosition(times, val);
- Assert.assertTrue(index == 2);
- }
-
- @Test
- public void test2() {
- MRJobExecutionResource resource = new MRJobExecutionResource();
- String timeList = " 0, 10,20,40 ";
- List<Long> times = resource.parseTimeList(timeList);
-
- TaskExecutionAPIEntity test1 = new TaskExecutionAPIEntity();
- test1.setDuration(15 * 1000);
- test1.setTaskStatus("running");
- TaskExecutionAPIEntity test4 = new TaskExecutionAPIEntity();
- test4.setDuration(13 * 1000);
- test4.setTaskStatus("running");
- TaskExecutionAPIEntity test2 = new TaskExecutionAPIEntity();
- test2.setDuration(0 * 1000);
- test2.setEndTime(100);
- test2.setTaskStatus("x");
- TaskExecutionAPIEntity test3 = new TaskExecutionAPIEntity();
- test3.setDuration(19 * 1000);
- test3.setTaskStatus("running");
- TaskExecutionAPIEntity test5 = new TaskExecutionAPIEntity();
- test5.setDuration(20 * 1000);
- test5.setEndTime(28);
- test5.setTaskStatus("x");
- List<TaskExecutionAPIEntity> tasks = new ArrayList<>();
- tasks.add(test1);
- tasks.add(test2);
- tasks.add(test3);
- tasks.add(test4);
- tasks.add(test5);
-
- List<MRJobTaskGroupResponse.UnitTaskCount> runningTaskCount = new ArrayList<>();
- List<MRJobTaskGroupResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>();
-
- Comparator comparator = new MRJobExecutionResource.RunningTaskComparator();
- resource.initTaskCountList(runningTaskCount, finishedTaskCount, times, comparator);
-
- for (TaskExecutionAPIEntity o : tasks) {
- int index = resource.getPosition(times, o.getDuration());
- if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
- MRJobTaskGroupResponse.UnitTaskCount counter = runningTaskCount.get(index);
- counter.taskCount++;
- counter.entities.add(o);
- } else if (o.getEndTime() != 0) {
- MRJobTaskGroupResponse.UnitTaskCount counter = finishedTaskCount.get(index);
- counter.taskCount++;
- counter.entities.add(o);
- }
- }
- int top = 2;
- if (top > 0) {
- resource.getTopTasks(runningTaskCount, top);
- }
- Assert.assertTrue(runningTaskCount.get(1).taskCount == 3);
- Assert.assertTrue(runningTaskCount.get(1).topEntities.size() == 2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
new file mode 100644
index 0000000..2cd0b8e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-service/src/test/java/org/apache/eagle/service/jpm/TestTaskCountPerJobHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.service.jpm;
+
+import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestTaskCountPerJobHelper {
+ TaskCountByDurationHelper helper = new TaskCountByDurationHelper();
+
+ @Test
+ public void test() {
+ String timeList = " 0, 10,20,40 ";
+ List<Long> times = helper.parseTimeList(timeList);
+ Assert.assertTrue(times.size() == 4);
+
+ long val = 25 * 1000;
+ int index = helper.getPosition(times, val);
+ Assert.assertTrue(index == 2);
+ }
+
+ @Test
+ public void test2() {
+ TaskExecutionAPIEntity test1 = new TaskExecutionAPIEntity();
+ test1.setDuration(15 * 1000);
+ test1.setTaskStatus("running");
+ TaskExecutionAPIEntity test4 = new TaskExecutionAPIEntity();
+ test4.setDuration(13 * 1000);
+ test4.setTaskStatus("running");
+ TaskExecutionAPIEntity test2 = new TaskExecutionAPIEntity();
+ test2.setDuration(0 * 1000);
+ test2.setEndTime(100);
+ test2.setTaskStatus("x");
+ TaskExecutionAPIEntity test3 = new TaskExecutionAPIEntity();
+ test3.setDuration(19 * 1000);
+ test3.setTaskStatus("running");
+ TaskExecutionAPIEntity test5 = new TaskExecutionAPIEntity();
+ test5.setDuration(20 * 1000);
+ test5.setEndTime(28);
+ test5.setTaskStatus("x");
+ List<TaskExecutionAPIEntity> tasks = new ArrayList<>();
+ tasks.add(test1);
+ tasks.add(test2);
+ tasks.add(test3);
+ tasks.add(test4);
+ tasks.add(test5);
+
+ List<MRJobTaskCountResponse.UnitTaskCount> runningTaskCount = new ArrayList<>();
+ List<MRJobTaskCountResponse.UnitTaskCount> finishedTaskCount = new ArrayList<>();
+
+ String timeList = " 0, 10,20,40 ";
+ List<Long> times = helper.parseTimeList(timeList);
+
+ helper.initTaskCountList(runningTaskCount, finishedTaskCount, times, new TaskCountByDurationHelper.RunningTaskComparator());
+
+ for (TaskExecutionAPIEntity o : tasks) {
+ int index = helper.getPosition(times, o.getDuration());
+ if (o.getTaskStatus().equalsIgnoreCase(Constants.TaskState.RUNNING.toString())) {
+ MRJobTaskCountResponse.UnitTaskCount counter = runningTaskCount.get(index);
+ counter.taskCount++;
+ counter.entities.add(o);
+ } else if (o.getEndTime() != 0) {
+ MRJobTaskCountResponse.UnitTaskCount counter = finishedTaskCount.get(index);
+ counter.taskCount++;
+ counter.entities.add(o);
+ }
+ }
+ int top = 2;
+ if (top > 0) {
+ helper.getTopTasks(runningTaskCount, top);
+ }
+ Assert.assertTrue(runningTaskCount.get(1).taskCount == 3);
+ Assert.assertTrue(runningTaskCount.get(1).topEntities.size() == 2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a66f64cf/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 7dce0a2..ec56eac 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory;
public class Constants {
private static final Logger LOG = LoggerFactory.getLogger(Constants.class);
+ public static final String GENERIC_METRIC_SERVICE = "GenericMetricService";
+
//SPARK
public static final String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
public static final String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";