You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/09/02 03:49:02 UTC
incubator-eagle git commit: [EAGLE-518] add Job counter metrics for
mr history job
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 9488afc15 -> b2b16b745
[EAGLE-518] add Job counter metrics for mr history job
Author: wujinhu <wu...@126.com>
Closes #414 from wujinhu/EAGLE-518.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b2b16b74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b2b16b74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b2b16b74
Branch: refs/heads/develop
Commit: b2b16b745e210eb80929d970820c322d557d93e0
Parents: 9488afc
Author: wujinhu <wu...@126.com>
Authored: Fri Sep 2 11:48:54 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Fri Sep 2 11:48:54 2016 +0800
----------------------------------------------------------------------
.../history/crawler/JHFCrawlerDriverImpl.java | 53 +++---------
.../metrics/JobCountMetricsGenerator.java | 88 ++++++++++++++++++++
.../JobExecutionMetricsCreationListener.java | 75 +++++++++++++++++
.../JobEntityCreationEagleServiceListener.java | 13 +++
.../AbstractMetricsCreationListener.java | 42 ----------
.../JobExecutionMetricsCreationListener.java | 9 +-
.../TaskExecutionMetricsCreationListener.java | 3 +-
.../org/apache/eagle/jpm/util/Constants.java | 4 +
.../AbstractMetricsCreationListener.java | 42 ++++++++++
9 files changed, 239 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
index 077f4e1..2f326fe 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java
@@ -19,6 +19,7 @@
package org.apache.eagle.jpm.mr.history.crawler;
import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
+import org.apache.eagle.jpm.mr.history.metrics.JobCountMetricsGenerator;
import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus;
import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
import org.apache.eagle.jpm.mr.historyentity.JobCountEntity;
@@ -62,15 +63,12 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
private JobIdFilter jobFilter;
private int partitionId;
private TimeZone timeZone;
- private MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig;
- private MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig;
+ private JobCountMetricsGenerator jobCountMetricsGenerator;
public JHFCrawlerDriverImpl(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig,
MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig,
MRHistoryJobConfig.ControlConfig controlConfig, JHFInputStreamCallback reader,
JobHistoryLCM historyLCM, JobIdFilter jobFilter, int partitionId) throws Exception {
- this.eagleServiceConfig = eagleServiceConfig;
- this.jobExtractorConfig = jobExtractorConfig;
this.zeroBasedMonth = controlConfig.zeroBasedMonth;
this.dryRun = controlConfig.dryRun;
if (this.dryRun) {
@@ -81,6 +79,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
this.partitionId = partitionId;
this.jobFilter = jobFilter;
timeZone = TimeZone.getTimeZone(controlConfig.timeZone);
+ jobCountMetricsGenerator = new JobCountMetricsGenerator(eagleServiceConfig, jobExtractorConfig, timeZone);
}
/**
@@ -191,7 +190,13 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
jobHistoryFile);
processedJobFileNames.add(jobHistoryFile);
- flushJobCount();
+ jobCountMetricsGenerator.flush(
+ String.format(FORMAT_JOB_PROCESS_DATE,
+ this.processDate.year,
+ this.processDate.month + 1,
+ this.processDate.day),
+ this.processDate.year, this.processDate.month, this.processDate.day
+ );
Long modifiedTime = item.getLeft();
return modifiedTime;
}
@@ -237,44 +242,6 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver {
}
}
- private void flushJobCount() throws Exception {
- List<Pair<String, String>> jobs = JobHistoryZKStateManager.instance().getProcessedJobs(
- String.format(FORMAT_JOB_PROCESS_DATE, this.processDate.year, this.processDate.month + 1, this.processDate.day)
- );
- JobCountEntity entity = new JobCountEntity();
- entity.setTotal(jobs.size());
- entity.setFail(0);
- jobs.stream().filter(job -> !job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())).forEach(
- job -> entity.setFail(1 + entity.getFail())
- );
-
- IEagleServiceClient client = new EagleServiceClientImpl(
- eagleServiceConfig.eagleServiceHost,
- eagleServiceConfig.eagleServicePort,
- eagleServiceConfig.username,
- eagleServiceConfig.password);
-
-
- GregorianCalendar cal = new GregorianCalendar(this.processDate.year, this.processDate.month, this.processDate.day, 0, 0, 0);
- cal.setTimeZone(timeZone);
- entity.setTimestamp(cal.getTimeInMillis());
- @SuppressWarnings("serial")
- Map<String, String> baseTags = new HashMap<String, String>() {
- {
- put("site", jobExtractorConfig.site);
- }
- };
- entity.setTags(baseTags);
- List<JobCountEntity> entities = new ArrayList<>();
- entities.add(entity);
-
- LOG.info("start flushing entities of total number " + entities.size());
- client.create(entities);
- LOG.info("finish flushing entities of total number " + entities.size());
- client.getJerseyClient().destroy();
- client.close();
- }
-
private void advanceOneDay() throws Exception {
//flushJobCount();
GregorianCalendar cal = new GregorianCalendar(timeZone);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
new file mode 100644
index 0000000..0e0e5e9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobCountMetricsGenerator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.mr.history.metrics;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
+import org.apache.eagle.jpm.mr.history.parser.EagleJobStatus;
+import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class JobCountMetricsGenerator {
+ private static final Logger LOG = LoggerFactory.getLogger(JobCountMetricsGenerator.class);
+
+ private MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig;
+ private MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig;
+ private TimeZone timeZone;
+
+ public JobCountMetricsGenerator(MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig,
+ MRHistoryJobConfig.JobExtractorConfig jobExtractorConfig,
+ TimeZone timeZone) {
+ this.eagleServiceConfig = eagleServiceConfig;
+ this.jobExtractorConfig = jobExtractorConfig;
+ this.timeZone = timeZone;
+ }
+
+ public void flush(String date, int year, int month, int day) throws Exception {
+ List<Pair<String, String>> jobs = JobHistoryZKStateManager.instance().getProcessedJobs(date);
+ int total = jobs.size();
+ int fail = 0;
+ for (Pair<String, String> job : jobs) {
+ if (!job.getRight().equals(EagleJobStatus.SUCCEEDED.toString())) {
+ ++fail;
+ }
+ }
+
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ eagleServiceConfig.eagleServiceHost,
+ eagleServiceConfig.eagleServicePort,
+ eagleServiceConfig.username,
+ eagleServiceConfig.password);
+
+
+ GregorianCalendar cal = new GregorianCalendar(year, month, day);
+ cal.setTimeZone(timeZone);
+ GenericMetricEntity metricEntity = new GenericMetricEntity();
+ metricEntity.setTimestamp(cal.getTimeInMillis());
+ metricEntity.setPrefix(Constants.JOB_COUNT_PER_DAY);
+ metricEntity.setValue(new double[]{total, fail});
+ @SuppressWarnings("serial")
+ Map<String, String> baseTags = new HashMap<String, String>() {
+ {
+ put("site", jobExtractorConfig.site);
+ }
+ };
+ metricEntity.setTags(baseTags);
+ List<GenericMetricEntity> entities = new ArrayList<>();
+ entities.add(metricEntity);
+
+ LOG.info("start flushing entities of total number " + entities.size());
+ client.create(entities);
+ LOG.info("finish flushing entities of total number " + entities.size());
+ client.getJerseyClient().destroy();
+ client.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
new file mode 100644
index 0000000..2129bed
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/metrics/JobExecutionMetricsCreationListener.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.mr.history.metrics;
+
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class JobExecutionMetricsCreationListener extends AbstractMetricsCreationListener<JobExecutionAPIEntity> {
+
+ @Override
+ public List<GenericMetricEntity> generateMetrics(JobExecutionAPIEntity entity) {
+ List<GenericMetricEntity> metrics = new ArrayList<>();
+ if (entity != null) {
+ Long timeStamp = entity.getTimestamp();
+ Map<String, String> tags = entity.getTags();
+ metrics.add(metricWrapper(timeStamp,
+ Constants.JOB_EXECUTION_TIME,
+ new double[]{entity.getDurationTime()},
+ tags));
+
+ metrics.add(metricWrapper(
+ timeStamp,
+ Constants.MAP_COUNT_RATIO,
+ new double[]{entity.getNumTotalMaps(), 1.0 * entity.getNumFailedMaps() / entity.getNumTotalMaps()},
+ tags));
+
+ metrics.add(metricWrapper(
+ timeStamp,
+ Constants.REDUCE_COUNT_RATIO,
+ new double[]{entity.getNumTotalReduces(), 1.0 * entity.getNumFailedReduces() / entity.getNumTotalReduces()},
+ tags));
+
+ org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounters = entity.getJobCounters();
+ if (jobCounters != null && jobCounters.getCounters() != null) {
+ for (Map<String, Long> metricGroup : jobCounters.getCounters().values()) {
+ for (Map.Entry<String, Long> entry : metricGroup.entrySet()) {
+ String metricName = entry.getKey().toLowerCase();
+ metrics.add(metricWrapper(timeStamp, "history." + metricName, new double[]{entry.getValue()}, tags));
+ }
+ }
+ }
+ }
+ return metrics;
+ }
+
+ @Override
+ public String buildMetricName(String field) {
+ return String.format(Constants.hadoopMetricFormat, Constants.JOB_LEVEL, field);
+ }
+
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 30eeb54..623a776 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -19,9 +19,11 @@
package org.apache.eagle.jpm.mr.history.parser;
import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
+import org.apache.eagle.jpm.mr.history.metrics.JobExecutionMetricsCreationListener;
import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
import org.apache.eagle.jpm.mr.historyentity.*;
import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.log.entity.GenericMetricEntity;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -43,6 +45,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
List<JobEventAPIEntity> jobEvents = new ArrayList<>();
List<TaskExecutionAPIEntity> taskExecs = new ArrayList<>();
List<TaskAttemptExecutionAPIEntity> taskAttemptExecs = new ArrayList<>();
+ private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener();
private TimeZone timeZone;
public JobEntityCreationEagleServiceListener(MRHistoryJobConfig configManager) {
@@ -91,6 +94,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
client.getJerseyClient().setReadTimeout(jobExtractorConfig.readTimeoutSeconds * 1000);
logger.info("start flushing entities of total number " + list.size());
+ List<GenericMetricEntity> metricEntities = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
JobBaseAPIEntity entity = list.get(i);
if (entity instanceof JobExecutionAPIEntity) {
@@ -98,6 +102,8 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
JobHistoryZKStateManager.instance().updateProcessedJob(timeStamp2Date(entity.getTimestamp()),
entity.getTags().get(MRJobTagName.JOB_ID.toString()),
((JobExecutionAPIEntity) entity).getCurrentState());
+
+ metricEntities.addAll(jobExecutionMetricsCreationListener.generateMetrics((JobExecutionAPIEntity)entity));
} else if (entity instanceof JobEventAPIEntity) {
jobEvents.add((JobEventAPIEntity) entity);
} else if (entity instanceof TaskExecutionAPIEntity) {
@@ -113,6 +119,12 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
checkResult(result);
jobs.clear();
}
+ if (metricEntities.size() > 0) {
+ logger.info("flush job metrics of number " + metricEntities.size());
+ result = client.create(metricEntities);
+ checkResult(result);
+ metricEntities.clear();
+ }
if (jobEvents.size() > 0) {
logger.info("flush JobEventAPIEntity of number " + jobEvents.size());
result = client.create(jobEvents);
@@ -131,6 +143,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
checkResult(result);
taskAttemptExecs.clear();
}
+
logger.info("finish flushing entities of total number " + list.size());
list.clear();
client.getJerseyClient().destroy();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java
deleted file mode 100644
index 8634b6a..0000000
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/AbstractMetricsCreationListener.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.mr.running.parser.metrics;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericMetricEntity;
-
-import java.util.List;
-import java.util.Map;
-
-public abstract class AbstractMetricsCreationListener<E extends TaggedLogAPIEntity> {
-
- public abstract List<GenericMetricEntity> generateMetrics(E entity);
-
- protected abstract String buildMetricName(String field);
-
- protected GenericMetricEntity metricWrapper(Long timestamp, String field, double value, Map<String, String> tags) {
- String metricName = buildMetricName(field);
- GenericMetricEntity metricEntity = new GenericMetricEntity();
- metricEntity.setTimestamp(timestamp);
- metricEntity.setTags(tags);
- metricEntity.setPrefix(metricName);
- metricEntity.setValue(new double[]{value});
- return metricEntity;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
index 76d2a19..8b30d45 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/JobExecutionMetricsCreationListener.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.running.parser.metrics;
import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener;
import java.util.ArrayList;
import java.util.List;
@@ -34,15 +35,15 @@ public class JobExecutionMetricsCreationListener extends AbstractMetricsCreation
if (entity != null) {
Long currentTime = System.currentTimeMillis();
Map<String, String> tags = entity.getTags();
- metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_MB, entity.getAllocatedMB(), tags));
- metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_VCORES, entity.getAllocatedVCores(), tags));
- metrics.add(metricWrapper(currentTime, Constants.RUNNING_CONTAINERS, entity.getRunningContainers(), tags));
+ metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_MB, new double[]{entity.getAllocatedMB()}, tags));
+ metrics.add(metricWrapper(currentTime, Constants.ALLOCATED_VCORES, new double[]{entity.getAllocatedVCores()}, tags));
+ metrics.add(metricWrapper(currentTime, Constants.RUNNING_CONTAINERS, new double[]{entity.getRunningContainers()}, tags));
org.apache.eagle.jpm.util.jobcounter.JobCounters jobCounters = entity.getJobCounters();
if (jobCounters != null && jobCounters.getCounters() != null) {
for (Map<String, Long> metricGroup : jobCounters.getCounters().values()) {
for (Map.Entry<String, Long> entry : metricGroup.entrySet()) {
String metricName = entry.getKey().toLowerCase();
- metrics.add(metricWrapper(currentTime, metricName, entry.getValue(), tags));
+ metrics.add(metricWrapper(currentTime, metricName, new double[]{entry.getValue()}, tags));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
index d0b0d57..9f22a7f 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/metrics/TaskExecutionMetricsCreationListener.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.running.parser.metrics;
import org.apache.eagle.jpm.mr.runningentity.TaskExecutionAPIEntity;
import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.jpm.util.metrics.AbstractMetricsCreationListener;
import java.util.ArrayList;
import java.util.List;
@@ -33,7 +34,7 @@ public class TaskExecutionMetricsCreationListener extends AbstractMetricsCreatio
if (entity != null) {
Long currentTime = System.currentTimeMillis();
Map<String, String> tags = entity.getTags();
- metrics.add(metricWrapper(currentTime, Constants.TASK_EXECUTION_TIME, entity.getDuration(), tags));
+ metrics.add(metricWrapper(currentTime, Constants.TASK_EXECUTION_TIME, new double[]{entity.getDuration()}, tags));
}
return metrics;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index ec56eac..e18fe07 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -172,7 +172,11 @@ public class Constants {
public static final String ALLOCATED_VCORES = "allocatedvcores";
public static final String RUNNING_CONTAINERS = "runningcontainers";
public static final String TASK_EXECUTION_TIME = "taskduration";
+ public static final String JOB_EXECUTION_TIME = "jobduration";
+ public static final String MAP_COUNT_RATIO = "map.count.ratio";
+ public static final String REDUCE_COUNT_RATIO = "reduce.count.ratio";
public static final String JOB_LEVEL = "job";
public static final String TASK_LEVEL = "task";
+ public static final String JOB_COUNT_PER_DAY = "hadoop.job.day.count";
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b2b16b74/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java
new file mode 100644
index 0000000..dd61432
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/metrics/AbstractMetricsCreationListener.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.metrics;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractMetricsCreationListener<E extends TaggedLogAPIEntity> {
+
+ public abstract List<GenericMetricEntity> generateMetrics(E entity);
+
+ protected abstract String buildMetricName(String field);
+
+ protected GenericMetricEntity metricWrapper(Long timestamp, String field, double[] values, Map<String, String> tags) {
+ String metricName = buildMetricName(field);
+ GenericMetricEntity metricEntity = new GenericMetricEntity();
+ metricEntity.setTimestamp(timestamp);
+ metricEntity.setTags(tags);
+ metricEntity.setPrefix(metricName);
+ metricEntity.setValue(values);
+ return metricEntity;
+ }
+}