You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/01/20 03:16:57 UTC
[1/2] eagle git commit: [EAGLE-859] MapReduce job performance
suggestion
Repository: eagle
Updated Branches:
refs/heads/master 015d57788 -> eae6e8f11
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
index a12f589..a9f5132 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
@@ -17,7 +17,7 @@
package org.apache.eagle.jpm.analyzer.publisher;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.StringUtils;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import java.util.ArrayList;
@@ -27,14 +27,17 @@ import java.util.Map;
public class Result {
//for EagleStorePublisher
- private TaggedLogAPIEntity alertEntity = null;//TODO
+ private Map<String, List<TaggedLogAPIEntity>> alertEntities = new HashMap<>();
//for EmailPublisher
- private Map<String, List<Pair<ResultLevel, String>>> alertMessages = new HashMap<>();
+ private Map<String, List<ProcessorResult>> alertMessages = new HashMap<>();
public void addEvaluatorResult(Class<?> type, EvaluatorResult result) {
Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults();
+ Map<Class<?>, TaggedLogAPIEntity> processorEntities = result.getProcessorEntities();
+
for (Class<?> processorType : processorResults.keySet()) {
ProcessorResult processorResult = processorResults.get(processorType);
+
if (processorResult.resultLevel.equals(ResultLevel.NONE)) {
continue;
}
@@ -42,17 +45,27 @@ public class Result {
String typeName = type.getName();
if (!alertMessages.containsKey(typeName)) {
alertMessages.put(typeName, new ArrayList<>());
+ alertEntities.put(typeName, new ArrayList<>());
}
- alertMessages.get(typeName).add(Pair.of(processorResult.getResultLevel(), processorResult.getMessage()));
+ normalizeResult(processorResult);
+ alertMessages.get(typeName).add(processorResult);
+ alertEntities.get(typeName).add(processorEntities.get(processorType));
+
}
}
- public TaggedLogAPIEntity getAlertEntity() {
- return alertEntity;
+ public Map<String, List<ProcessorResult>> getAlertMessages() {
+ return alertMessages;
+ }
+
+ public Map<String, List<TaggedLogAPIEntity>> getAlertEntities() {
+ return alertEntities;
}
- public Map<String, List<Pair<ResultLevel, String>>> getAlertMessages() {
- return alertMessages;
+ private void normalizeResult(ProcessorResult processorResult) {
+ if (processorResult.getSettings() != null && !processorResult.getSettings().isEmpty()) {
+ processorResult.setSettingList(StringUtils.join(processorResult.getSettings(), "\n"));
+ }
}
/**
@@ -61,18 +74,52 @@ public class Result {
public enum ResultLevel {
NONE,
+ INFO,
NOTICE,
WARNING,
CRITICAL
}
+ public enum RuleType {
+ COMPRESS,
+ SPLIT,
+ SPILL,
+ TASK_NUMBER,
+ GC_TIME,
+ RESOURCE_CONTENTION,
+ DATA_SKEW,
+
+ LONG_STUCK_JOB,
+ LONG_DURATION_JOB
+ }
+
public static class ProcessorResult {
+ private RuleType ruleType;
private ResultLevel resultLevel;
private String message;
+ private List<String> settings;
+ private String settingList;
- public ProcessorResult(ResultLevel resultLevel, String message) {
+ public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message, List<String> settings) {
+ this.ruleType = ruleType;
this.resultLevel = resultLevel;
this.message = message;
+ this.settings = settings;
+ }
+
+ public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message) {
+ this.ruleType = ruleType;
+ this.resultLevel = resultLevel;
+ this.message = message;
+ this.settings = new ArrayList<>();
+ }
+
+ public RuleType getRuleType() {
+ return ruleType;
+ }
+
+ public void setRuleType(RuleType ruleType) {
+ this.ruleType = ruleType;
}
public ResultLevel getResultLevel() {
@@ -90,6 +137,22 @@ public class Result {
public void setMessage(String message) {
this.message = message;
}
+
+ public List<String> getSettings() {
+ return settings;
+ }
+
+ public void setSettings(List<String> settings) {
+ this.settings = settings;
+ }
+
+ public String getSettingList() {
+ return settingList;
+ }
+
+ public void setSettingList(String settingList) {
+ this.settingList = settingList;
+ }
}
/**
@@ -97,13 +160,22 @@ public class Result {
*/
public static class EvaluatorResult {
private Map<Class<?>, ProcessorResult> processorResults = new HashMap<>();
+ private Map<Class<?>, TaggedLogAPIEntity> processorEntities = new HashMap<>();
public void addProcessorResult(Class<?> type, ProcessorResult result) {
this.processorResults.put(type, result);
}
+ public void addProcessorEntity(Class<?> type, TaggedLogAPIEntity entity) {
+ this.processorEntities.put(type, entity);
+ }
+
public Map<Class<?>, ProcessorResult> getProcessorResults() {
return this.processorResults;
}
+
+ public Map<Class<?>, TaggedLogAPIEntity> getProcessorEntities() {
+ return processorEntities;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
index 4b18f7c..6a51a76 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
@@ -17,7 +17,7 @@
package org.apache.eagle.jpm.analyzer.publisher.dedup;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.publisher.Result;
public interface AlertDeduplicator {
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
index 09f1af6..b139b3c 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
@@ -17,7 +17,7 @@
package org.apache.eagle.jpm.analyzer.publisher.dedup.impl;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.publisher.Result;
import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
import org.apache.eagle.jpm.analyzer.util.Constants;
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
index 774e6d2..4c6661a 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
@@ -62,4 +62,5 @@ public class Constants {
public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic";
public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend";
+
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
index 39cec68..996adba 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
@@ -115,13 +115,17 @@
<table class="body-wrap" style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; width: 100%; background-color: #f6f6f6; margin: 0;" bgcolor="#f6f6f6" border="1">
<caption><b>Analysis By $evaluator</b></caption>
<tr>
- <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></td>
+ <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>type</b></th>
<th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="250"><b>message</b></th>
+ <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="250"><b>optimizer setting</b></th>
+ <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></th>
</tr>
- #foreach($message in ${elem["extend"].get($evaluator).keySet()})
+ #foreach($result in ${elem["extend"].get($evaluator)})
<tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
- <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;"><b>${elem["extend"].get($evaluator).get($message)}</b></td>
- <th style="...">$message</th>
+ <td style="...">${result.ruleType}</td>
+ <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">${result.message}</td>
+ <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">${result.settingList}</td>
+ <td style="...">${result.resultLevel}</td>
</tr>
#end
</table>
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
index cbbdad3..8c65adf 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
@@ -38,5 +38,6 @@ public class JPAEntityRepository extends EntityRepository {
entitySet.add(JobProcessTimeStampEntity.class);
entitySet.add(JobCountEntity.class);
entitySet.add(TaskAttemptErrorCategoryEntity.class);
+ entitySet.add(JobSuggestionAPIEntity.class);
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
new file mode 100644
index 0000000..3863a5d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.historyentity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+import java.util.List;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import static org.apache.eagle.jpm.util.Constants.JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa")
+@ColumnFamily("f")
+@Prefix("jsuggestion")
+@Service(JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+ @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true),
+ @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
+ })
+public class JobSuggestionAPIEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private String optimizerSuggestion;
+ @Column("b")
+ private List<String> optimizerSettings;
+
+ public String getOptimizerSuggestion() {
+ return optimizerSuggestion;
+ }
+
+ public void setOptimizerSuggestion(String optimizerSuggestion) {
+ this.optimizerSuggestion = optimizerSuggestion;
+ valueChanged("optimizerSuggestion");
+ }
+
+ public List<String> getOptimizerSettings() {
+ return optimizerSettings;
+ }
+
+ public void setOptimizerSettings(List<String> optimizerSettings) {
+ this.optimizerSettings = optimizerSettings;
+ valueChanged("optimizerSettings");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
index 8db7f5c..46fcf5e 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
@@ -46,6 +46,40 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
private String error;
@Column("f")
private JobCounters jobCounters;
+ // new added
+ @Column("g")
+ private long shuffleFinishTime;
+ @Column("h")
+ private long sortFinishTime;
+ @Column("i")
+ private long mapFinishTime;
+
+ public long getShuffleFinishTime() {
+ return shuffleFinishTime;
+ }
+
+ public void setShuffleFinishTime(long shuffleFinishTime) {
+ this.shuffleFinishTime = shuffleFinishTime;
+ valueChanged("shuffleFinishTime");
+ }
+
+ public long getSortFinishTime() {
+ return sortFinishTime;
+ }
+
+ public void setSortFinishTime(long sortFinishTime) {
+ this.sortFinishTime = sortFinishTime;
+ valueChanged("sortFinishTime");
+ }
+
+ public long getMapFinishTime() {
+ return mapFinishTime;
+ }
+
+ public void setMapFinishTime(long mapFinishTime) {
+ this.mapFinishTime = mapFinishTime;
+ valueChanged("mapFinishTime");
+ }
public String getTaskStatus() {
return taskStatus;
@@ -53,7 +87,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
public void setTaskStatus(String taskStatus) {
this.taskStatus = taskStatus;
- pcs.firePropertyChange("taskStatus", null, null);
+ valueChanged("taskStatus");
}
public long getStartTime() {
@@ -62,7 +96,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
public void setStartTime(long startTime) {
this.startTime = startTime;
- pcs.firePropertyChange("startTime", null, null);
+ valueChanged("startTime");
}
public long getEndTime() {
@@ -71,7 +105,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
public void setEndTime(long endTime) {
this.endTime = endTime;
- pcs.firePropertyChange("endTime", null, null);
+ valueChanged("endTime");
}
public long getDuration() {
@@ -80,7 +114,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
public void setDuration(long duration) {
this.duration = duration;
- pcs.firePropertyChange("duration", null, null);
+ valueChanged("duration");
}
public String getError() {
@@ -89,7 +123,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
public void setError(String error) {
this.error = error;
- pcs.firePropertyChange("error", null, null);
+ valueChanged("error");
}
public JobCounters getJobCounters() {
@@ -98,6 +132,6 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
public void setJobCounters(JobCounters jobCounters) {
this.jobCounters = jobCounters;
- pcs.firePropertyChange("jobCounters", null, null);
+ valueChanged("jobCounters");
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index 349c489..2e3a1a8 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -89,6 +89,11 @@
<version>1.6</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-analyzer</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index 262054e..28ebf4e 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.history.crawler;
import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.history.parser.JHFParserBase;
import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory;
+import org.apache.eagle.jpm.util.MRJobTagName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
@SuppressWarnings("serial")
Map<String, String> baseTags = new HashMap<String, String>() {
{
- put("site", appConfig.getJobHistoryEndpointConfig().site);
+ put(MRJobTagName.SITE.toString(), appConfig.getJobHistoryEndpointConfig().site);
}
};
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index d89937e..d58eadc 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -81,6 +81,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
private long sumReduceTaskDuration;
private JobCounterMetricsGenerator jobCounterMetricsGenerator;
+ private JobSuggestionListener jobSuggestionListener;
private MRHistoryJobConfig appConfig;
@@ -127,6 +128,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
this.appConfig = appConfig;
this.jobCounterMetricsGenerator = new JobCounterMetricsGenerator(appConfig.getEagleServiceConfig());
+ this.jobSuggestionListener = new JobSuggestionListener(appConfig.getConfig());
+ this.addListener(jobSuggestionListener);
}
public void register(HistoryJobEntityLifecycleListener lifecycleListener) {
@@ -179,7 +182,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
this.jobType = jobType;
}
- protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception {
+ protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters, Object mapCounters, Object reduceCounters) throws Exception {
String id = values.get(Keys.JOBID);
if (jobId == null) {
@@ -300,8 +303,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
this.jobCounterMetricsGenerator.setBaseTags(jobExecutionEntity.getTags());
formatDiagnostics(values.get(Keys.DIAGNOSTICS));
-
entityCreated(jobExecutionEntity);
+
+ if (configuration != null && totalCounters != null) {
+ JobCounters parsedTotalCounters = parseCounters(totalCounters);
+ JobCounters parsedMapCounters = parseCounters(mapCounters);
+ JobCounters parsedReduceCounters = parseCounters(reduceCounters);
+ jobSuggestionListener.jobCountersCreated(parsedTotalCounters, parsedMapCounters, parsedReduceCounters);
+ jobSuggestionListener.jobConfigCreated(configuration);
+ }
}
}
@@ -332,7 +342,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
}
}
- super.notifiyListeners(entity);
+ super.notifyListeners(entity);
}
protected abstract JobCounters parseCounters(Object value) throws IOException;
@@ -432,7 +442,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
// it is very likely that an attempt ID could be both succeeded and failed due to M/R system
// in this case, we should ignore this attempt?
if (taskAttemptStartTime.get(taskAttemptID) == null) {
- LOG.warn("task attemp has consistency issue " + taskAttemptID);
+ LOG.warn("task attempt has consistency issue " + taskAttemptID);
return;
}
entity.setStartTime(taskAttemptStartTime.get(taskAttemptID));
@@ -441,6 +451,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
entity.setDuration(entity.getEndTime() - entity.getStartTime());
entity.setTaskStatus(values.get(Keys.TASK_STATUS));
entity.setError(values.get(Keys.ERROR));
+ if (values.containsKey(Keys.SHUFFLE_FINISHED)) {
+ entity.setShuffleFinishTime(Long.valueOf(values.get(Keys.SHUFFLE_FINISHED)));
+ }
+ if (values.containsKey(Keys.SORT_FINISHED)) {
+ entity.setSortFinishTime(Long.valueOf(values.get(Keys.SORT_FINISHED)));
+ }
+ if (values.containsKey(Keys.MAP_FINISH_TIME)) {
+ entity.setMapFinishTime(Long.valueOf(values.get(Keys.MAP_FINISH_TIME)));
+ }
if (values.get(Keys.COUNTERS) != null || counters != null) { // when task is killed, COUNTERS does not exist
//entity.setJobCounters(parseCounters(values.get(Keys.COUNTERS)));
entity.setJobCounters(parseCounters(counters));
@@ -473,7 +492,6 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
taskAttemptErrorCategoryEntity.setTimestamp(entity.getTimestamp());
entityCreated(taskAttemptErrorCategoryEntity);
}
-
taskAttemptStartTime.remove(taskAttemptID);
} else {
// silently ignore
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
index adeb41e..615e9ad 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
@@ -18,6 +18,7 @@
package org.apache.eagle.jpm.mr.history.parser;
+@Deprecated
public enum JHFFormat {
MRVer1,
MRVer2
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 8184f90..1903fc9 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -185,7 +185,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
if (js.getJobQueueName() != null) {
values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
}
- handleJob(wrapper.getType(), values, null);
+ handleJob(wrapper.getType(), values, null, null, null);
}
private void handleJobInited(Event wrapper) throws Exception {
@@ -209,7 +209,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
if (js.getUberized() != null) {
values.put(Keys.UBERISED, js.getUberized().toString());
}
- handleJob(wrapper.getType(), values, null);
+ handleJob(wrapper.getType(), values, null, null, null);
}
private void handleJobFinished(Event wrapper) throws Exception {
@@ -234,7 +234,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
}
values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCEEDED.name());
- handleJob(wrapper.getType(), values, js.getTotalCounters());
+ handleJob(wrapper.getType(), values, js.getTotalCounters(), js.getMapCounters(), js.getReduceCounters());
}
private void handleJobUnsuccessfulCompletion(Event wrapper) throws Exception {
@@ -258,7 +258,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
if (js.getDiagnostics() != null) {
values.put(Keys.DIAGNOSTICS, js.getDiagnostics().toString());
}
- handleJob(wrapper.getType(), values, null);
+ handleJob(wrapper.getType(), values, null, null, null);
}
private void handleTaskStarted(Event wrapper) throws Exception {
@@ -539,6 +539,9 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
}
protected JobCounters parseCounters(Object value) throws IOException {
+ if (value == null) {
+ return null;
+ }
JobCounters jc = new JobCounters();
Map<String, Map<String, Long>> groups = new HashMap<>();
JhCounters counters = (JhCounters) value;
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index ca49d9c..1d17640 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -38,11 +38,13 @@ public class JHFParserFactory {
MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig();
JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, appConfig);
+ // add HistoryJobEntityCreationListener
reader2.addListener(new JobEntityCreationEagleServiceListener(appConfig));
reader2.addListener(new TaskFailureListener(eagleServiceConfig));
reader2.addListener(new TaskAttemptCounterListener(eagleServiceConfig));
reader2.addListener(new JobConfigurationCreationServiceListener(eagleServiceConfig));
+ // add HistoryJobEntityLifecycleListener
reader2.register(new JobEntityLifecycleAggregator());
JHFParserBase parser = new JHFMRVer2Parser(reader2);
return parser;
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
index a80462d..f2730fd 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
@@ -29,7 +29,7 @@ public class JobEntityCreationPublisher {
listeners.add(l);
}
- public void notifiyListeners(JobBaseAPIEntity entity) throws Exception {
+ public void notifyListeners(JobBaseAPIEntity entity) throws Exception {
for (HistoryJobEntityCreationListener l : listeners) {
l.jobEntityCreated(entity);
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
new file mode 100644
index 0000000..e5b0d2e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
+import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.jpm.mr.history.parser.JHFEventReaderBase.Keys;
+import org.apache.hadoop.conf.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ATTEMPT_ID;
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ID;
+
+/*
+ * JobEventCounterListener provides an interface to add job/task counter analyzers
+ */
+public class JobSuggestionListener implements HistoryJobEntityCreationListener {
+ private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionListener.class);
+
+ private MapReduceAnalyzerEntity info;
+ private MRJobPerformanceAnalyzer<MapReduceAnalyzerEntity> analyzer;
+
+ public JobSuggestionListener(Config config) {
+ this.info = new MapReduceAnalyzerEntity();
+ this.analyzer = new MRJobPerformanceAnalyzer<>(config);
+ }
+
+ @Override
+ public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+ if (entity instanceof TaskExecutionAPIEntity) {
+ info.getTasksMap().put(entity.getTags().get(TASK_ID.toString()), (TaskExecutionAPIEntity) entity);
+ } else if (entity instanceof TaskAttemptExecutionAPIEntity) {
+ info.getCompletedTaskAttemptsMap().put(entity.getTags().get(TASK_ATTEMPT_ID.toString()), (TaskAttemptExecutionAPIEntity) entity);
+ } else if (entity instanceof JobExecutionAPIEntity) {
+ JobExecutionAPIEntity jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
+ info.setCurrentState(jobExecutionAPIEntity.getCurrentState());
+ info.setStartTime(jobExecutionAPIEntity.getStartTime());
+ info.setEndTime(jobExecutionAPIEntity.getEndTime());
+ info.setDurationTime(jobExecutionAPIEntity.getDurationTime());
+ info.setUserId(jobExecutionAPIEntity.getTags().get(MRJobTagName.USER.toString()));
+ info.setJobId(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_ID.toString()));
+ info.setJobDefId(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+ info.setSiteId(jobExecutionAPIEntity.getTags().get(MRJobTagName.SITE.toString()));
+ info.setJobName(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_NAME.toString())) ;
+ info.setJobQueueName(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_QUEUE.toString()));
+ info.setJobType(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+ info.setFinishedMaps(jobExecutionAPIEntity.getNumFinishedMaps());
+ info.setFinishedReduces(jobExecutionAPIEntity.getNumFinishedReduces());
+ info.setFailedReduces(jobExecutionAPIEntity.getNumFailedReduces());
+ info.setFailedMaps(jobExecutionAPIEntity.getNumFailedMaps());
+ info.setTotalMaps(jobExecutionAPIEntity.getNumTotalMaps());
+ info.setTotalReduces(jobExecutionAPIEntity.getNumTotalReduces());
+ }
+ }
+
+ public void jobConfigCreated(Configuration configuration) {
+ info.setJobConf(configuration);
+ }
+
+ public void jobCountersCreated(JobCounters totalCounters, JobCounters mapCounters, JobCounters reduceCounters) {
+ info.setTotalCounters(totalCounters);
+ info.setReduceCounters(reduceCounters);
+ info.setMapCounters(mapCounters);
+ }
+
+ @Override
+ public void flush() throws Exception {
+ analyzer.analyze(info);
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index 856f051..24c734d 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -18,7 +18,6 @@
package org.apache.eagle.jpm.mr.history.parser;
-import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
import org.apache.eagle.jpm.mr.historyentity.TaskAttemptCounterAPIEntity;
import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index e8d5311..3836e3a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -49,7 +49,7 @@
"service": {
"host": "sandbox.hortonworks.com",
- "port": 9099,
+ "port": 9090,
"username": "admin",
"password": "secret",
"readTimeOutSeconds" : 10,
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 120303e..6b33d31 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -19,7 +19,7 @@
package org.apache.eagle.jpm.mr.running.parser;
import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
@@ -173,7 +173,7 @@ public class MRJobParser implements Runnable {
break;
}
}
- mrJobPerformanceAnalyzer.analysis(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
+ mrJobPerformanceAnalyzer.analyze(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 0ba6521..4ee58a1 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -117,6 +117,7 @@ public class Constants {
public static final String JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME = "RunningTaskExecutionService";
public static final String JPA_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService";
public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService";
+ public static final String JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME = "JobOptimizerSuggestionService";
public static final String JOB_TASK_TYPE_TAG = "taskType";
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
index 8def44f..bbb80cd 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
@@ -48,7 +48,11 @@ public final class JobCounters implements Serializable {
}
public Long getCounterValue(CounterName counterName) {
- return counters.get(counterName.group.name).get(counterName.name);
+ if (counters.get(counterName.group.name).containsKey(counterName.name)) {
+ return counters.get(counterName.group.name).get(counterName.name);
+ } else {
+ return 0L;
+ }
}
public static enum GroupName {
[2/2] eagle git commit: [EAGLE-859] MapReduce job performance
suggestion
Posted by qi...@apache.org.
[EAGLE-859] MapReduce job performance suggestion
https://issues.apache.org/jira/browse/EAGLE-859
Author: Zhao, Qingwen <qi...@apache.org>
Author: Qingwen Zhao <qi...@gmail.com>
Closes #784 from qingwen220/EAGLE-859.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/eae6e8f1
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/eae6e8f1
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/eae6e8f1
Branch: refs/heads/master
Commit: eae6e8f11367c397c3a9ce39f2c59cf341f33847
Parents: 015d577
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri Jan 20 11:14:56 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Fri Jan 20 11:14:56 2017 +0800
----------------------------------------------------------------------
eagle-jpm/eagle-jpm-analyzer/pom.xml | 5 +
.../eagle/jpm/analyzer/AnalyzerEntity.java | 130 ------------
.../apache/eagle/jpm/analyzer/Evaluator.java | 5 +-
.../apache/eagle/jpm/analyzer/JobAnalyzer.java | 10 +-
.../apache/eagle/jpm/analyzer/Processor.java | 5 +-
.../jpm/analyzer/meta/model/AnalyzerEntity.java | 130 ++++++++++++
.../meta/model/MapReduceAnalyzerEntity.java | 174 +++++++++++++++
.../analyzer/mr/MRJobPerformanceAnalyzer.java | 10 +-
.../jpm/analyzer/mr/sla/SLAJobEvaluator.java | 7 +-
.../sla/processors/LongStuckJobProcessor.java | 4 +-
.../UnExpectedLongDurationJobProcessor.java | 8 +-
.../mr/suggestion/JobSuggestionEvaluator.java | 74 ++++++-
.../MapReduceCompressionSettingProcessor.java | 82 ++++++++
.../suggestion/MapReduceDataSkewProcessor.java | 63 ++++++
.../mr/suggestion/MapReduceGCTimeProcessor.java | 74 +++++++
.../MapReduceJobSuggestionContext.java | 209 +++++++++++++++++++
.../MapReduceQueueResourceProcessor.java | 85 ++++++++
.../mr/suggestion/MapReduceSpillProcessor.java | 125 +++++++++++
.../MapReduceSplitSettingProcessor.java | 47 +++++
.../suggestion/MapReduceTaskNumProcessor.java | 194 +++++++++++++++++
.../analyzer/publisher/EagleStorePublisher.java | 40 +++-
.../jpm/analyzer/publisher/EmailPublisher.java | 21 +-
.../eagle/jpm/analyzer/publisher/Publisher.java | 2 +-
.../eagle/jpm/analyzer/publisher/Result.java | 90 +++++++-
.../publisher/dedup/AlertDeduplicator.java | 2 +-
.../dedup/impl/SimpleDeduplicator.java | 2 +-
.../eagle/jpm/analyzer/util/Constants.java | 1 +
.../main/resources/AnalyzerReportTemplate.vm | 12 +-
.../mr/historyentity/JPAEntityRepository.java | 1 +
.../historyentity/JobSuggestionAPIEntity.java | 63 ++++++
.../TaskAttemptExecutionAPIEntity.java | 46 +++-
eagle-jpm/eagle-jpm-mr-history/pom.xml | 5 +
.../crawler/DefaultJHFInputStreamCallback.java | 3 +-
.../mr/history/parser/JHFEventReaderBase.java | 28 ++-
.../eagle/jpm/mr/history/parser/JHFFormat.java | 1 +
.../mr/history/parser/JHFMRVer2EventReader.java | 11 +-
.../jpm/mr/history/parser/JHFParserFactory.java | 2 +
.../parser/JobEntityCreationPublisher.java | 2 +-
.../history/parser/JobSuggestionListener.java | 94 +++++++++
.../parser/TaskAttemptCounterListener.java | 1 -
.../src/main/resources/application.conf | 2 +-
.../jpm/mr/running/parser/MRJobParser.java | 4 +-
.../org/apache/eagle/jpm/util/Constants.java | 1 +
.../eagle/jpm/util/jobcounter/JobCounters.java | 6 +-
44 files changed, 1674 insertions(+), 207 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml
index 50fb803..07f5766 100644
--- a/eagle-jpm/eagle-jpm-analyzer/pom.xml
+++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml
@@ -47,6 +47,11 @@
</dependency>
<dependency>
<groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-entity</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
<artifactId>eagle-app-base</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
deleted file mode 100644
index f9b7af0..0000000
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.analyzer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * will refactor later if other types of job needs this.
- * AnalyzerEntity for each job needed to be analysised
- */
-public class AnalyzerEntity {
- private String jobDefId;
- private String jobId;
- private String siteId;
- private String userId;
-
- private long startTime;
- private long endTime;
- private long durationTime;
- private String currentState;
- private double progress;
-
- private Map<String, Object> jobConfig = new HashMap<>();
-
- private Map<String, Object> jobMeta = new HashMap<>();
-
- public String getJobDefId() {
- return jobDefId;
- }
-
- public void setJobDefId(String jobDefId) {
- this.jobDefId = jobDefId;
- }
-
- public String getJobId() {
- return jobId;
- }
-
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
-
- public String getSiteId() {
- return siteId;
- }
-
- public void setSiteId(String siteId) {
- this.siteId = siteId;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public void setUserId(String userId) {
- this.userId = userId;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- }
-
- public long getDurationTime() {
- return durationTime;
- }
-
- public void setDurationTime(long durationTime) {
- this.durationTime = durationTime;
- }
-
- public String getCurrentState() {
- return currentState;
- }
-
- public void setCurrentState(String currentState) {
- this.currentState = currentState;
- }
-
- public Map<String, Object> getJobConfig() {
- return jobConfig;
- }
-
- public void setJobConfig(Map<String, Object> jobConfig) {
- this.jobConfig = jobConfig;
- }
-
- public Map<String, Object> getJobMeta() {
- return jobMeta;
- }
-
- public void setJobMeta(Map<String, Object> jobMeta) {
- this.jobMeta = jobMeta;
- }
-
- public double getProgress() {
- return progress;
- }
-
- public void setProgress(double progress) {
- this.progress = progress;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
index 6617916..60ee8d6 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
@@ -17,8 +17,9 @@
package org.apache.eagle.jpm.analyzer;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.publisher.Result;
-public interface Evaluator {
- Result.EvaluatorResult evaluate(AnalyzerEntity analyzerEntity);
+public interface Evaluator<T extends AnalyzerEntity> {
+ Result.EvaluatorResult evaluate(T analyzerEntity);
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
index 6cda1cd..1e9c00e 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
@@ -17,12 +17,14 @@
package org.apache.eagle.jpm.analyzer;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
+
/**
- * Each JobAnalyzer contains one or more Evaluators to analysis each job.
+ * Each JobAnalyzer contains one or more Evaluators to analyze each job.
* Each Evaluator is a group of Processors
- * Each Processor implements an algorithm or a model to analysis one dimension of a job
+ * Each Processor implements an algorithm or a model to analyze one dimension of a job
*
*/
-public interface JobAnalyzer {
- void analysis(AnalyzerEntity analyzerEntity) throws Exception;
+public interface JobAnalyzer<T extends AnalyzerEntity> {
+ void analyze(T analyzerEntity) throws Exception;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
index d5a8a74..419e402 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
@@ -17,8 +17,9 @@
package org.apache.eagle.jpm.analyzer;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.publisher.Result;
-public interface Processor {
- Result.ProcessorResult process(AnalyzerEntity jobAnalysisEntity);
+public interface Processor<T extends AnalyzerEntity> {
+ Result.ProcessorResult process(T jobAnalysisEntity);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
new file mode 100644
index 0000000..189d85d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * will refactor later if other types of job needs this.
+ * AnalyzerEntity for each job needed to be analyzed
+ */
+public class AnalyzerEntity {
+ private String jobDefId;
+ private String jobId;
+ private String siteId;
+ private String userId;
+
+ private long startTime;
+ private long endTime;
+ private long durationTime;
+ private String currentState;
+ private double progress;
+
+ private Map<String, Object> jobConfig = new HashMap<>();
+
+ private Map<String, Object> jobMeta = new HashMap<>();
+
+ public String getJobDefId() {
+ return jobDefId;
+ }
+
+ public void setJobDefId(String jobDefId) {
+ this.jobDefId = jobDefId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public long getDurationTime() {
+ return durationTime;
+ }
+
+ public void setDurationTime(long durationTime) {
+ this.durationTime = durationTime;
+ }
+
+ public String getCurrentState() {
+ return currentState;
+ }
+
+ public void setCurrentState(String currentState) {
+ this.currentState = currentState;
+ }
+
+ public Map<String, Object> getJobConfig() {
+ return jobConfig;
+ }
+
+ public void setJobConfig(Map<String, Object> jobConfig) {
+ this.jobConfig = jobConfig;
+ }
+
+ public Map<String, Object> getJobMeta() {
+ return jobMeta;
+ }
+
+ public void setJobMeta(Map<String, Object> jobMeta) {
+ this.jobMeta = jobMeta;
+ }
+
+ public double getProgress() {
+ return progress;
+ }
+
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
new file mode 100644
index 0000000..cd6249d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MapReduceAnalyzerEntity extends AnalyzerEntity {
+ private String jobName;
+ private String jobQueueName;
+ private String jobType;
+ private int totalMaps;
+ private int totalReduces;
+ private int failedMaps;
+ private int failedReduces;
+ private int finishedMaps;
+ private int finishedReduces;
+ private JobCounters totalCounters;
+ private JobCounters mapCounters;
+ private JobCounters reduceCounters;
+ private Map<String, TaskExecutionAPIEntity> tasksMap;
+ private Map<String, TaskAttemptExecutionAPIEntity> completedTaskAttemptsMap;
+ private Configuration jobConf;
+
+ public MapReduceAnalyzerEntity() {
+ this.setEndTime(-1);
+ this.setStartTime(-1);
+ finishedMaps = finishedReduces = 0;
+ jobName = jobQueueName = "";
+ tasksMap = new HashMap<>();
+ completedTaskAttemptsMap = new HashMap<>();
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public String getJobQueueName() {
+ return jobQueueName;
+ }
+
+ public String getJobType() {
+ return jobType;
+ }
+
+ public int getTotalMaps() {
+ return totalMaps;
+ }
+
+ public int getTotalReduces() {
+ return totalReduces;
+ }
+
+ public int getFailedMaps() {
+ return failedMaps;
+ }
+
+ public int getFailedReduces() {
+ return failedReduces;
+ }
+
+ public int getFinishedMaps() {
+ return finishedMaps;
+ }
+
+ public int getFinishedReduces() {
+ return finishedReduces;
+ }
+
+ public JobCounters getTotalCounters() {
+ return totalCounters;
+ }
+
+ public JobCounters getMapCounters() {
+ return mapCounters;
+ }
+
+ public JobCounters getReduceCounters() {
+ return reduceCounters;
+ }
+
+ public Map<String, TaskExecutionAPIEntity> getTasksMap() {
+ return tasksMap;
+ }
+
+ public Map<String, TaskAttemptExecutionAPIEntity> getCompletedTaskAttemptsMap() {
+ return completedTaskAttemptsMap;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public void setJobQueueName(String jobQueueName) {
+ this.jobQueueName = jobQueueName;
+ }
+
+ public void setJobType(String jobType) {
+ this.jobType = jobType;
+ }
+
+ public void setTotalMaps(int totalMaps) {
+ this.totalMaps = totalMaps;
+ }
+
+ public void setTotalReduces(int totalReduces) {
+ this.totalReduces = totalReduces;
+ }
+
+ public void setFailedMaps(int failedMaps) {
+ this.failedMaps = failedMaps;
+ }
+
+ public void setFailedReduces(int failedReduces) {
+ this.failedReduces = failedReduces;
+ }
+
+ public void setFinishedMaps(int finishedMaps) {
+ this.finishedMaps = finishedMaps;
+ }
+
+ public void setFinishedReduces(int finishedReduces) {
+ this.finishedReduces = finishedReduces;
+ }
+
+ public void setTotalCounters(JobCounters totalCounters) {
+ this.totalCounters = totalCounters;
+ }
+
+ public void setMapCounters(JobCounters mapCounters) {
+ this.mapCounters = mapCounters;
+ }
+
+ public void setReduceCounters(JobCounters reduceCounters) {
+ this.reduceCounters = reduceCounters;
+ }
+
+ public void setTasksMap(Map<String, TaskExecutionAPIEntity> tasksMap) {
+ this.tasksMap = tasksMap;
+ }
+
+ public void setCompletedTaskAttemptsMap(Map<String, TaskAttemptExecutionAPIEntity> completedTaskAttemptsMap) {
+ this.completedTaskAttemptsMap = completedTaskAttemptsMap;
+ }
+
+ public Configuration getJobConf() {
+ return jobConf;
+ }
+
+ public void setJobConf(Configuration jobConf) {
+ this.jobConf = jobConf;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
index e0e579a..e32a37c 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
@@ -20,6 +20,7 @@ package org.apache.eagle.jpm.analyzer.mr;
import com.typesafe.config.Config;
import org.apache.eagle.jpm.analyzer.*;
import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher;
@@ -33,7 +34,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
+public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAnalyzer<T>, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(MRJobPerformanceAnalyzer.class);
private List<Evaluator> evaluators = new ArrayList<>();
@@ -51,11 +52,14 @@ public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
}
@Override
- public void analysis(AnalyzerEntity analyzerJobEntity) throws Exception {
+ public void analyze(T analyzerJobEntity) throws Exception {
Result result = new Result();
for (Evaluator evaluator : evaluators) {
- result.addEvaluatorResult(evaluator.getClass(), evaluator.evaluate(analyzerJobEntity));
+ Result.EvaluatorResult evaluatorResult = evaluator.evaluate(analyzerJobEntity);
+ if (evaluatorResult != null) {
+ result.addEvaluatorResult(evaluator.getClass(), evaluatorResult);
+ }
}
for (Publisher publisher : publishers) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
index f10b68d..a77e55d 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.analyzer.mr.sla;
import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.Evaluator;
import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
import org.apache.eagle.jpm.analyzer.mr.sla.processors.LongStuckJobProcessor;
@@ -26,6 +26,7 @@ import org.apache.eagle.jpm.analyzer.mr.sla.processors.UnExpectedLongDurationJob
import org.apache.eagle.jpm.analyzer.Processor;
import org.apache.eagle.jpm.analyzer.publisher.Result;
import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.apache.eagle.jpm.util.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,10 @@ public class SLAJobEvaluator implements Evaluator, Serializable {
@Override
public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) {
+ if (!analyzerJobEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) {
+ return null;
+ }
+
Result.EvaluatorResult result = new Result.EvaluatorResult();
List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId());
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
index 35f3b27..b3322ed 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.analyzer.mr.sla.processors;
import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.Processor;
import org.apache.eagle.jpm.analyzer.publisher.Result;
import org.slf4j.Logger;
@@ -38,6 +38,6 @@ public class LongStuckJobProcessor implements Processor, Serializable {
@Override
public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
LOG.info("Job {} In LongStuckJobProcessor", analyzerJobEntity.getJobDefId());
- return new Result.ProcessorResult(Result.ResultLevel.NONE, "");
+ return new Result.ProcessorResult(Result.RuleType.LONG_STUCK_JOB, Result.ResultLevel.NONE, "");
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
index 9d4ce2b..8f655ba 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
@@ -18,7 +18,7 @@
package org.apache.eagle.jpm.analyzer.mr.sla.processors;
import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.publisher.Result;
import org.apache.eagle.jpm.analyzer.Processor;
import org.apache.eagle.jpm.analyzer.util.Constants;
@@ -50,7 +50,7 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab
Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta();
long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData);
if (avgDurationTime == 0L) {
- return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+ return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE);
}
Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD;
@@ -62,12 +62,12 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab
double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime;
for (Map.Entry<Result.ResultLevel, Double> entry : sorted) {
if (expirePercent >= entry.getValue()) {
- return new Result.ProcessorResult(entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
+ return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
(int)(expirePercent * 100), avgDurationTime / 1000));
}
}
- return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+ return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE);
}
private long getAvgDuration(AnalyzerEntity mrJobAnalysisEntity, Map<String, Object> jobMetaData) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
index 79f5318..ea60ff9 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
@@ -18,15 +18,24 @@
package org.apache.eagle.jpm.analyzer.mr.suggestion;
import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.mr.historyentity.JobSuggestionAPIEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
-public class JobSuggestionEvaluator implements Evaluator, Serializable {
+import static org.apache.eagle.jpm.util.MRJobTagName.*;
+
+public class JobSuggestionEvaluator implements Evaluator<MapReduceAnalyzerEntity>, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionEvaluator.class);
private Config config;
@@ -35,10 +44,63 @@ public class JobSuggestionEvaluator implements Evaluator, Serializable {
this.config = config;
}
+ private List<Processor> loadProcessors(MapReduceJobSuggestionContext context) {
+ List<Processor> processors = new ArrayList<>();
+ processors.add(new MapReduceCompressionSettingProcessor(context));
+ processors.add(new MapReduceSplitSettingProcessor(context));
+ processors.add(new MapReduceDataSkewProcessor(context));
+ processors.add(new MapReduceGCTimeProcessor(context));
+ processors.add(new MapReduceSpillProcessor(context));
+ processors.add(new MapReduceTaskNumProcessor(context));
+ //processors.add(new MapReduceQueueResourceProcessor(context));
+
+ return processors;
+ }
+
@Override
- public Result.EvaluatorResult evaluate(AnalyzerEntity mrJobEntity) {
- Result.EvaluatorResult result = new Result.EvaluatorResult();
- //TODO
- return result;
+ public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity analyzerEntity) {
+ if (analyzerEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) {
+ return null;
+ }
+
+ MapReduceJobSuggestionContext jobContext = new MapReduceJobSuggestionContext(analyzerEntity);
+ if (jobContext.getNumMaps() == 0) {
+ return null;
+ }
+
+ try {
+ Result.EvaluatorResult result = new Result.EvaluatorResult();
+ for (Processor processor : loadProcessors(jobContext)) {
+ Result.ProcessorResult processorResult = processor.process(analyzerEntity);
+ if (processorResult != null) {
+ result.addProcessorResult(processor.getClass(), processorResult);
+ result.addProcessorEntity(processor.getClass(), createJobSuggestionEntity(processorResult, analyzerEntity));
+ }
+ }
+ return result;
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ return null;
+ }
+
}
+
+ private static JobSuggestionAPIEntity createJobSuggestionEntity(Result.ProcessorResult processorResult, MapReduceAnalyzerEntity entity) {
+ Map<String, String> tags = new HashMap<>();
+ tags.put(JOB_ID.toString(), entity.getJobId());
+ tags.put(JOD_DEF_ID.toString(), entity.getJobDefId());
+ tags.put(SITE.toString(), entity.getSiteId());
+ tags.put(USER.toString(), entity.getUserId());
+ tags.put(RULE_TYPE.toString(), processorResult.getRuleType().toString());
+ tags.put(JOB_QUEUE.toString(), entity.getJobQueueName());
+ tags.put(JOB_TYPE.toString(), entity.getJobType());
+ JobSuggestionAPIEntity jobSuggestionAPIEntity = new JobSuggestionAPIEntity();
+ jobSuggestionAPIEntity.setTags(tags);
+ jobSuggestionAPIEntity.setTimestamp(entity.getStartTime()); // startTime as the job timestamp
+ jobSuggestionAPIEntity.setOptimizerSuggestion(processorResult.getMessage());
+ jobSuggestionAPIEntity.setOptimizerSettings(processorResult.getSettings());
+
+ return jobSuggestionAPIEntity;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java
new file mode 100644
index 0000000..62c5c2b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_OUTPUT_COMPRESS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC;
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES;
+import static org.apache.hadoop.mapreduce.MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR;
+
+public class MapReduceCompressionSettingProcessor implements Processor<MapReduceAnalyzerEntity> {
+
+ private MapReduceJobSuggestionContext context;
+
+ public MapReduceCompressionSettingProcessor(MapReduceJobSuggestionContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+ StringBuilder sb = new StringBuilder();
+ List<String> optSettings = new ArrayList<>();
+
+ JobConf jobconf = new JobConf(context.getJobconf());
+ if (jobconf.getLong(NUM_REDUCES, 0) > 0) {
+ if (!jobconf.getCompressMapOutput()) {
+ optSettings.add(String.format("%s=true", MAP_OUTPUT_COMPRESS));
+ sb.append("Please set " + MAP_OUTPUT_COMPRESS + " to true to reduce network IO.\n");
+ } else {
+ String codecClassName = jobconf.get(MAP_OUTPUT_COMPRESS_CODEC);
+ if (!(codecClassName.endsWith("LzoCodec") || codecClassName.endsWith("SnappyCodec"))) {
+ optSettings.add(String.format("%s=LzoCodec or SnappyCodec", MAP_OUTPUT_COMPRESS_CODEC));
+ sb.append("Best practice: use LzoCodec or SnappyCodec for " + MAP_OUTPUT_COMPRESS_CODEC).append("\n");
+ }
+ }
+ }
+
+ if (!jobconf.getBoolean(FileOutputFormat.COMPRESS, false)) {
+ optSettings.add(String.format("%s=true", FileOutputFormat.COMPRESS));
+ sb.append("Please set " + FileOutputFormat.COMPRESS + " to true to reduce disk usage and network IO.\n");
+ } else {
+ String codecName = jobconf.get(FileOutputFormat.COMPRESS_CODEC, "");
+ String outputFileFormat = jobconf.get(OUTPUT_FORMAT_CLASS_ATTR, "");
+
+ if ((codecName.endsWith("GzipCodec") || codecName.endsWith("SnappyCodec") || codecName.endsWith("DefaultCodec"))
+ && outputFileFormat.endsWith("TextOutputFormat")) {
+ sb.append("Best practice: don't use Gzip/Snappy/DefaultCodec with TextOutputFormat");
+ sb.append(" as this will cause the output files to be unsplittable. ");
+ sb.append("Please use LZO instead or ");
+ sb.append("use a container file format such as SequenceFileOutputFormat.\n");
+ }
+ }
+
+ if (sb.length() > 0) {
+ return new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.INFO, sb.toString(), optSettings);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java
new file mode 100644
index 0000000..b21a927
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+public class MapReduceDataSkewProcessor implements Processor<MapReduceAnalyzerEntity> {
+ private MapReduceJobSuggestionContext context;
+
+ public MapReduceDataSkewProcessor(MapReduceJobSuggestionContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+ TaskAttemptExecutionAPIEntity worstReduce = context.getWorstReduce();
+ if (context.getNumReduces() == 0 || worstReduce == null) {
+ return null;
+ }
+ StringBuilder sb = new StringBuilder();
+ try {
+ long worstTimeInSec = (worstReduce.getEndTime() - worstReduce.getShuffleFinishTime()) / DateTimeUtil.ONESECOND;
+ if (worstTimeInSec - context.getAvgReduceTimeInSec() > 30 * 60 ) {
+ long avgInputs = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS)
+ / context.getNumReduces();
+ long worstInputs = worstReduce.getJobCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS);
+
+ if (worstInputs > avgInputs * 5) {
+ sb.append("Data skew detected in reducers. The average reduce time is ").append(context.getAvgReduceTimeInSec());
+ sb.append(" seconds, the worst reduce time is ").append(worstTimeInSec);
+ sb.append(" seconds. Please investigate this problem to improve your job performance.\n");
+ }
+ }
+
+ if (sb.length() > 0) {
+ return new Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.INFO, sb.toString());
+ }
+ } catch (NullPointerException e) {
+ // When job failed there may not have counters, so just ignore it
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java
new file mode 100644
index 0000000..103de7a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_JAVA_OPTS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_JAVA_OPTS;
+
+public class MapReduceGCTimeProcessor implements Processor<MapReduceAnalyzerEntity> {
+ private MapReduceJobSuggestionContext context;
+
+ public MapReduceGCTimeProcessor(MapReduceJobSuggestionContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+ StringBuilder sb = new StringBuilder();
+ List<String> optSettings = new ArrayList<>();
+ String setting;
+
+ try {
+ long mapGCTime = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.GC_MILLISECONDS);
+ long mapCPUTime = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.CPU_MILLISECONDS);
+
+ if (mapGCTime > mapCPUTime * 0.1) {
+ setting = String.format("-D%s", MAP_JAVA_OPTS);
+ optSettings.add(setting);
+ sb.append("Map GC_TIME_MILLIS took too long. Please increase mapper memory via ").append(setting);
+ sb.append(", or optimize your mapper class.\n");
+ }
+
+ if (context.getNumReduces() > 0) {
+ long reduceGCTime = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.GC_MILLISECONDS);
+ long reduceCPUTime = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.CPU_MILLISECONDS);
+ if (reduceGCTime > reduceCPUTime * 0.1) {
+ setting = String.format("-D%s", REDUCE_JAVA_OPTS);
+ optSettings.add(setting);
+ sb.append("Reduce GC_TIME_MILLIS took too long. Please increase memory for reduce via ").append(setting);
+ sb.append(", or optimize your reducer class.\n");
+ }
+ }
+
+ if (sb.length() > 0) {
+ return new Result.ProcessorResult(Result.RuleType.GC_TIME, Result.ResultLevel.INFO, sb.toString(), optSettings);
+ }
+ } catch (NullPointerException e) {
+ // When job failed there may not have counters, so just ignore it
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java
new file mode 100644
index 0000000..1f4e548
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskType;
+
+import java.util.regex.Pattern;
+
+import static org.apache.eagle.jpm.util.jobcounter.JobCounters.CounterName.MAP_OUTPUT_BYTES;
+import static org.apache.eagle.jpm.util.jobcounter.JobCounters.CounterName.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_MAPS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES;
+
+public class MapReduceJobSuggestionContext {
+
+ private JobConf jobconf;
+ private MapReduceAnalyzerEntity job;
+
+ private long numMaps;
+ private long numReduces;
+
+ private long avgMapTimeInSec;
+ private long avgReduceTimeInSec;
+ private long avgShuffleTimeInSec;
+ private TaskAttemptExecutionAPIEntity worstMap;
+ private TaskAttemptExecutionAPIEntity worstReduce;
+ private TaskAttemptExecutionAPIEntity worstShuffle;
+ private TaskAttemptExecutionAPIEntity lastMap;
+ private TaskAttemptExecutionAPIEntity lastReduce;
+ private TaskAttemptExecutionAPIEntity lastShuffle;
+ private TaskAttemptExecutionAPIEntity firstMap;
+ private TaskAttemptExecutionAPIEntity firstReduce;
+ private TaskAttemptExecutionAPIEntity firstShuffle;
+
+ private long minMapSpillMemBytes;
+
+ public static final Pattern MAX_HEAP_PATTERN = Pattern.compile("-Xmx([0-9]+)([kKmMgG]?)");
+
+ public MapReduceJobSuggestionContext(MapReduceAnalyzerEntity job) {
+ this.job = job;
+ this.jobconf = new JobConf(job.getJobConf());
+ buildContext();
+ }
+
+ private MapReduceJobSuggestionContext buildContext() {
+ avgMapTimeInSec = avgReduceTimeInSec = avgShuffleTimeInSec = 0;
+ numMaps = jobconf.getLong(NUM_MAPS, 0);
+ numReduces = jobconf.getLong(NUM_REDUCES, 0);
+
+ for (TaskAttemptExecutionAPIEntity attempt : job.getCompletedTaskAttemptsMap().values()) {
+ String taskType = getTaskType(attempt);
+ if (Constants.TaskType.MAP.toString().equalsIgnoreCase(taskType)) {
+ long mapTime = attempt.getEndTime() - attempt.getStartTime();
+ avgMapTimeInSec += mapTime;
+ if (firstMap == null || firstMap.getStartTime() > attempt.getStartTime()) {
+ firstMap = attempt;
+ }
+ if (lastMap == null || lastMap.getEndTime() < attempt.getEndTime()) {
+ lastMap = attempt;
+ }
+ if (worstMap == null || (worstMap.getEndTime() - worstMap.getStartTime()) < mapTime) {
+ worstMap = attempt;
+ }
+ long tmpMem = getMinimumIOSortMemory(attempt);
+ if (tmpMem > minMapSpillMemBytes) {
+ minMapSpillMemBytes = tmpMem;
+ }
+ } else if (TaskType.REDUCE.toString().equalsIgnoreCase(taskType)) {
+ long shuffleTime = attempt.getShuffleFinishTime() - attempt.getStartTime();
+ avgShuffleTimeInSec += shuffleTime;
+ if (firstShuffle == null || firstShuffle.getStartTime() > attempt.getStartTime()) {
+ firstShuffle = attempt;
+ }
+ if (lastShuffle == null || lastShuffle.getShuffleFinishTime() < attempt.getShuffleFinishTime()) {
+ lastShuffle = attempt;
+ }
+ if (worstShuffle == null || (worstShuffle.getShuffleFinishTime() - worstShuffle.getStartTime()) < shuffleTime) {
+ worstShuffle = attempt;
+ }
+
+ long reduceTime = attempt.getEndTime() - attempt.getShuffleFinishTime();
+ avgReduceTimeInSec += reduceTime;
+ if (firstReduce == null || firstReduce.getStartTime() > attempt.getStartTime()) {
+ firstReduce = attempt;
+ }
+ if (lastReduce == null || lastReduce.getEndTime() < attempt.getEndTime()) {
+ lastReduce = attempt;
+ }
+ if (worstReduce == null || (worstReduce.getEndTime() - worstReduce.getShuffleFinishTime()) < reduceTime) {
+ worstReduce = attempt;
+ }
+ }
+ }
+ if (numMaps > 0) {
+ avgMapTimeInSec = avgMapTimeInSec / numMaps / DateTimeUtil.ONESECOND;
+ }
+ if (numReduces > 0) {
+ avgReduceTimeInSec = avgReduceTimeInSec / numReduces / DateTimeUtil.ONESECOND;
+ avgShuffleTimeInSec = avgShuffleTimeInSec / numReduces / DateTimeUtil.ONESECOND;
+ }
+ return this;
+ }
+
+ private String getTaskType(TaskAttemptExecutionAPIEntity taskAttemptInfo) {
+ return taskAttemptInfo.getTags().get(MRJobTagName.TASK_TYPE.toString());
+ }
+
+ /**
+ * The default index size is 16.
+ *
+ * @param attempt
+ * @return minimal sort memory
+ */
+ private long getMinimumIOSortMemory(TaskAttemptExecutionAPIEntity attempt) {
+ long records = attempt.getJobCounters().getCounterValue(MAP_OUTPUT_RECORDS);
+ long outputBytes = attempt.getJobCounters().getCounterValue(MAP_OUTPUT_BYTES);
+ return outputBytes + records * 16;
+ }
+
+ public JobConf getJobconf() {
+ return jobconf;
+ }
+
+ public MapReduceAnalyzerEntity getJob() {
+ return job;
+ }
+
+ public long getNumMaps() {
+ return numMaps;
+ }
+
+ public long getNumReduces() {
+ return numReduces;
+ }
+
+ public long getAvgMapTimeInSec() {
+ return avgMapTimeInSec;
+ }
+
+ public long getAvgReduceTimeInSec() {
+ return avgReduceTimeInSec;
+ }
+
+ public long getAvgShuffleTimeInSec() {
+ return avgShuffleTimeInSec;
+ }
+
+ public TaskAttemptExecutionAPIEntity getWorstMap() {
+ return worstMap;
+ }
+
+ public TaskAttemptExecutionAPIEntity getWorstReduce() {
+ return worstReduce;
+ }
+
+ public TaskAttemptExecutionAPIEntity getWorstShuffle() {
+ return worstShuffle;
+ }
+
+ public TaskAttemptExecutionAPIEntity getLastMap() {
+ return lastMap;
+ }
+
+ public TaskAttemptExecutionAPIEntity getLastReduce() {
+ return lastReduce;
+ }
+
+ public TaskAttemptExecutionAPIEntity getLastShuffle() {
+ return lastShuffle;
+ }
+
+ public TaskAttemptExecutionAPIEntity getFirstMap() {
+ return firstMap;
+ }
+
+ public TaskAttemptExecutionAPIEntity getFirstReduce() {
+ return firstReduce;
+ }
+
+ public TaskAttemptExecutionAPIEntity getFirstShuffle() {
+ return firstShuffle;
+ }
+
+ public long getMinMapSpillMemBytes() {
+ return minMapSpillMemBytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
new file mode 100644
index 0000000..a1b57bf
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+
+/*
+ * Criterion: (TimeElapsed / (numTasks / 500 * avgTaskTime)) > 20
+ */
+public class MapReduceQueueResourceProcessor implements Processor<MapReduceAnalyzerEntity> {
+ private static final Logger LOG = LoggerFactory.getLogger(MapReduceQueueResourceProcessor.class);
+
+ private MapReduceJobSuggestionContext context;
+
+ public MapReduceQueueResourceProcessor(MapReduceJobSuggestionContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+ try {
+ String userName = context.getJob().getUserId();
+ TaskAttemptExecutionAPIEntity lastMap = context.getLastMap();
+ TaskAttemptExecutionAPIEntity firstMap = context.getFirstMap();
+ TaskAttemptExecutionAPIEntity lastReduce = context.getLastReduce();
+ TaskAttemptExecutionAPIEntity firstShuffle = context.getFirstShuffle();
+
+ if (checkBatchUser(userName) && lastMap != null && firstMap != null) {
+ StringBuilder sb = new StringBuilder();
+
+ long tasksPerTime = 500; // better get it from RM
+ long mapPhaseTimeInSec = (lastMap.getEndTime() - firstMap.getStartTime()) / DateTimeUtil.ONESECOND;
+ if (mapPhaseTimeInSec > context.getAvgMapTimeInSec()
+ * ((context.getNumMaps() + tasksPerTime - 1) / tasksPerTime) * 20) {
+ sb.append("There appears to have been resource contention during the map phase of your job. Please ask for more resources if your job is SLA-bound,");
+ sb.append(" or submit your job when the cluster is less busy.\n");
+ }
+
+ if (context.getNumReduces() > 0 && lastReduce != null && firstShuffle != null) {
+ long reducePhaseTimeInSec = (lastReduce.getEndTime() - firstShuffle.getStartTime()) / DateTimeUtil.ONESECOND;
+ if (reducePhaseTimeInSec > context.getAvgReduceTimeInSec()
+ * ((context.getNumReduces() + tasksPerTime - 1) / tasksPerTime) * 20) {
+ sb.append("Seems there was resource contention when your job in reduce phase, please ask for more resource if your job is SLA enabled,");
+ sb.append(" or submit your job when the cluster is less busy.\n");
+ }
+ }
+
+ if (sb.length() > 0) {
+ return new Result.ProcessorResult(Result.RuleType.RESOURCE_CONTENTION, Result.ResultLevel.INFO, sb.toString());
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ return null;
+ }
+
+ protected boolean checkBatchUser(String userName) {
+ return userName.startsWith("b_");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java
new file mode 100644
index 0000000..835b382
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import static org.apache.eagle.jpm.analyzer.mr.suggestion.MapReduceJobSuggestionContext.MAX_HEAP_PATTERN;
+import static org.apache.hadoop.mapreduce.MRJobConfig.IO_SORT_MB;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_JAVA_OPTS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_SORT_SPILL_PERCENT;
+
+/**
+ * Check whether spilled more than once, if true, find out the minimum value of the memory to hold all the data,
+ * based on that value, find out how much memory need for heap size.
+ */
+public class MapReduceSpillProcessor implements Processor<MapReduceAnalyzerEntity> {
+
+ private MapReduceJobSuggestionContext context;
+
+ public MapReduceSpillProcessor(MapReduceJobSuggestionContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+ StringBuilder sb = new StringBuilder();
+ List<String> optSettings = new ArrayList<>();
+ String setting;
+
+ long outputRecords = 0L; // Map output records
+ long spillRecords = 0L; // Spilled Records
+ try {
+ outputRecords = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.MAP_OUTPUT_RECORDS);
+ spillRecords = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.SPILLED_RECORDS);
+
+ if (outputRecords < spillRecords) {
+ sb.append("Total map output records: ").append(outputRecords);
+ sb.append(" Total map spilled records: ").append(spillRecords).append(". Please set");
+
+ long minMapSpillMemBytes = context.getMinMapSpillMemBytes();
+ double spillPercent = context.getJobconf().getDouble(MAP_SORT_SPILL_PERCENT, 0.8);
+ if (minMapSpillMemBytes > 512 * FileUtils.ONE_MB * spillPercent) {
+ if (Math.abs(1.0 - spillPercent) > 0.001) {
+ setting = String.format("-D%s=1", MAP_SORT_SPILL_PERCENT);
+ sb.append(" ").append(setting);
+ optSettings.add(setting);
+ }
+ } else {
+ minMapSpillMemBytes /= spillPercent;
+ }
+
+ long minMapSpillMemMB = (minMapSpillMemBytes / FileUtils.ONE_MB + 10) / 10 * 10;
+ if (minMapSpillMemMB >= 2047 ) {
+ sb.append("\nPlease reduce the block size of the input files and make sure they are splittable.");
+ } else {
+ setting = String.format("-D%s=%s", IO_SORT_MB, minMapSpillMemMB);
+ sb.append(" ").append(setting);
+ optSettings.add(setting);
+ long heapSize = getMaxHeapSize(context.getJobconf().get(MAP_JAVA_OPTS));
+ if (heapSize < 3 * minMapSpillMemMB) {
+ long expectedHeapSizeMB = (minMapSpillMemMB * 3 + 1024) / 1024 * 1024;
+ setting = String.format(" -D%s=-Xmx%sM", MAP_JAVA_OPTS, expectedHeapSizeMB);
+ sb.append(" ").append(setting);
+ optSettings.add(setting);
+ }
+ }
+ sb.append(" to avoid spilled records.\n");
+ }
+
+
+ long reduceInputRecords = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS);
+ spillRecords = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.SPILLED_RECORDS);
+ if (reduceInputRecords < spillRecords) {
+ sb.append("Please add more memory (mapreduce.reduce.java.opts) to avoid spilled records.");
+ sb.append(" Total Reduce input records: ").append(reduceInputRecords);
+ sb.append(" Total Spilled Records: ").append(spillRecords);
+ sb.append("\n");
+ }
+
+ if (sb.length() > 0) {
+ return new Result.ProcessorResult(Result.RuleType.SPILL, Result.ResultLevel.INFO, sb.toString(), optSettings);
+ }
+ } catch (NullPointerException e) {
+ //When job failed there may not have counters, so just ignore it
+ }
+ return null;
+ }
+
+ private static long getMaxHeapSize(String s) {
+ Matcher m = MAX_HEAP_PATTERN.matcher(s);
+ long val = 0;
+ if (m.find()) {
+ val = Long.parseLong(m.group(1));
+ if ("k".equalsIgnoreCase(m.group(2))) {
+ val /= 1024;
+ } else if ("g".equalsIgnoreCase(m.group(2))) {
+ val *= 1024;
+ }
+ }
+ return val;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
new file mode 100644
index 0000000..8eba468
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MapReduceSplitSettingProcessor implements Processor<MapReduceAnalyzerEntity> {
+
+ private MapReduceJobSuggestionContext context;
+
+ public MapReduceSplitSettingProcessor(MapReduceJobSuggestionContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+ StringBuilder sb = new StringBuilder();
+
+ if (context.getJobconf().getLong(FileInputFormat.SPLIT_MINSIZE, 0) > 1) {
+ sb.append("Best practice: don't set " + FileInputFormat.SPLIT_MINSIZE);
+ sb.append(", because it may lower data locality, hence maps will run slower.\n");
+ return new Result.ProcessorResult(Result.RuleType.SPLIT, Result.ResultLevel.INFO, sb.toString());
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java
new file mode 100644
index 0000000..00d5cc9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES;
+
+public class MapReduceTaskNumProcessor implements Processor<MapReduceAnalyzerEntity> {
+ private static final String[] SIZE_UNITS = {"B", "K", "M", "G", "T", "P"};
+ private MapReduceJobSuggestionContext context;
+
+ public MapReduceTaskNumProcessor(MapReduceJobSuggestionContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+ StringBuilder sb = new StringBuilder();
+ List<String> optSettings = new ArrayList<>();
+ try {
+ sb.append(analyzeReduceTaskNum(optSettings));
+ sb.append(analyzeMapTaskNum(optSettings));
+
+ if (sb.length() > 0) {
+ return new Result.ProcessorResult(Result.RuleType.TASK_NUMBER, Result.ResultLevel.INFO, sb.toString(), optSettings);
+ }
+ } catch (NullPointerException e) {
+ // When job failed there may not have counters, so just ignore it
+ }
+ return null;
+ }
+
+
+ private String analyzeReduceTaskNum(List<String> optSettings) {
+ StringBuilder sb = new StringBuilder();
+
+ long numReduces = context.getNumReduces();
+ if (numReduces > 0) {
+ long avgReduceTime = context.getAvgReduceTimeInSec();
+ long avgShuffleTime = context.getAvgShuffleTimeInSec();
+ long avgShuffleBytes = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_SHUFFLE_BYTES)
+ / numReduces;
+ long avgReduceOutput = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.HDFS_BYTES_WRITTEN)
+ / numReduces;
+ long avgReduceTotalTime = avgShuffleTime + avgReduceTime;
+
+ long suggestReduces = 0;
+ StringBuilder tmpsb = new StringBuilder();
+
+ String avgShuffleDisplaySize = bytesToHumanReadable(avgShuffleBytes);
+ if (avgShuffleBytes < 256 * FileUtils.ONE_MB && avgReduceTotalTime < 300
+ && avgReduceOutput < 256 * FileUtils.ONE_MB && numReduces > 1) {
+ tmpsb.append("average reduce input bytes is: ").append(avgShuffleDisplaySize).append(", ");
+ suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+ } else if (avgShuffleBytes > 10 * FileUtils.ONE_GB && avgReduceTotalTime > 1800) {
+ tmpsb.append("average reduce input bytes is: ").append(avgShuffleDisplaySize).append(", ");
+ suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+ }
+
+ if (avgReduceTotalTime < 60 && numReduces > 1) {
+ tmpsb.append("average reduce time is only ").append(avgReduceTotalTime).append(" seconds, ");
+ if (suggestReduces == 0) {
+ suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+ }
+ } else if (avgReduceTotalTime > 3600 && avgReduceTime > 1800) {
+ tmpsb.append("average reduce time is ").append(avgReduceTotalTime).append(" seconds, ");
+ if (suggestReduces == 0) {
+ suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+ }
+ }
+
+ String avgReduceOutputDisplaySize = bytesToHumanReadable(avgReduceOutput);
+ if (avgReduceOutput < 10 * FileUtils.ONE_MB && avgReduceTime < 300
+ && avgShuffleBytes < 2 * FileUtils.ONE_GB && numReduces > 1) {
+ tmpsb.append(" average reduce output is only ").append(avgReduceOutputDisplaySize).append(", ");
+ if (suggestReduces == 0) {
+ suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+ }
+ } else if (avgReduceOutput > 10 * FileUtils.ONE_GB && avgReduceTime > 1800) {
+ tmpsb.append(" average reduce output is ").append(avgReduceOutputDisplaySize).append(", ");
+ if (suggestReduces == 0) {
+ suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+ }
+ }
+
+ if (suggestReduces > 0) {
+ sb.append("Best practice: ").append(tmpsb.toString()).append("please consider ");
+ if (suggestReduces > numReduces) {
+ sb.append("increasing the ");
+ } else {
+ sb.append("decreasing the ");
+ }
+ String setting = String.format("-D%s=%s", NUM_REDUCES, suggestReduces);
+ sb.append("reducer number. You could try ").append(setting).append("\n");
+ optSettings.add(setting);
+ }
+ }
+ return sb.toString();
+ }
+
+ private String analyzeMapTaskNum(List<String> optSettings) {
+ StringBuilder sb = new StringBuilder();
+
+ long numMaps = context.getNumMaps();
+ long avgMapTime = context.getAvgMapTimeInSec();
+ long avgMapInput = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.HDFS_BYTES_READ)
+ / numMaps;
+ String avgMapInputDisplaySize = bytesToHumanReadable(avgMapInput);
+
+ if (avgMapInput < 5 * FileUtils.ONE_MB && avgMapTime < 30 && numMaps > 1) {
+ sb.append("Best practice: average map input bytes only have ").append(avgMapInputDisplaySize);
+ sb.append(". Please reduce the number of mappers by merging input files.\n");
+ } else if (avgMapInput > FileUtils.ONE_GB) {
+ sb.append("Best practice: average map input bytes have ").append(avgMapInputDisplaySize);
+ sb.append(". Please increase the number of mappers by using splittable compression, a container file format or a smaller block size.\n");
+ }
+
+ if (avgMapTime < 10 && numMaps > 1) {
+ sb.append("Best practice: average map time only have ").append(avgMapTime);
+ sb.append(" seconds. Please reduce the number of mappers by merging input files or by using a larger block size.\n");
+ } else if (avgMapTime > 600 && avgMapInput < FileUtils.ONE_GB) {
+ sb.append("Best practice: average map time is ").append(avgMapInput);
+ sb.append(" seconds. Please increase the number of mappers by using splittable compression, a container file format or a smaller block size.\n");
+ }
+
+ return sb.toString();
+ }
+
+ private long getReduceNum(long avgInputBytes, long avgOutputBytes, long avgTime) {
+ long newReduceNum = 1;
+ long tmpReduceNum;
+
+ long numReduces = context.getNumReduces();
+ tmpReduceNum = avgInputBytes * numReduces / (3 * FileUtils.ONE_GB);
+ if (tmpReduceNum > newReduceNum) {
+ newReduceNum = tmpReduceNum;
+ }
+
+ tmpReduceNum = avgOutputBytes * numReduces / (2 * FileUtils.ONE_GB);
+ if (tmpReduceNum > newReduceNum) {
+ newReduceNum = tmpReduceNum;
+ }
+
+ tmpReduceNum = avgTime * numReduces / (10 * 60);
+ if (tmpReduceNum > newReduceNum) {
+ newReduceNum = tmpReduceNum;
+ }
+
+ return newReduceNum;
+ }
+
+
+ private static String bytesToHumanReadable(long bytes) {
+ double val = bytes;
+ int idx = 0;
+ while (val >= 1024) {
+ val /= 1024.0;
+ idx += 1;
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append((int)Math.floor(val));
+ sb.append(SIZE_UNITS[idx]);
+ int tmp = (int)(1000 * val) % 1000;
+ if (idx >= 1 && tmp > 0) {
+ sb.append(tmp);
+ sb.append(SIZE_UNITS[idx - 1]);
+ }
+ return sb.toString();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
index 6109704..0d7d2d7 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
@@ -18,23 +18,61 @@
package org.apache.eagle.jpm.analyzer.publisher;
import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
public class EagleStorePublisher implements Publisher, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(EagleStorePublisher.class);
private Config config;
+ private IEagleServiceClient client;
+ private AlertDeduplicator alertDeduplicator;
public EagleStorePublisher(Config config) {
this.config = config;
+ this.alertDeduplicator = new SimpleDeduplicator();
}
@Override
public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+ if (result.getAlertMessages().size() == 0) {
+ return;
+ }
+
+ LOG.info("EagleStorePublisher gets job {}", analyzerJobEntity.getJobDefId());
+ if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
+ LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
+ return;
+ }
+
+ try {
+ this.client = new EagleServiceClientImpl(config);
+ for (Map.Entry<String, List<TaggedLogAPIEntity>> entry : result.getAlertEntities().entrySet()) {
+ client.create(entry.getValue());
+ LOG.info("successfully persist {} entities for evaluator {}", entry.getValue().size(), entry.getKey());
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
index 4e49094..842e0ac 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
@@ -18,12 +18,10 @@
package org.apache.eagle.jpm.analyzer.publisher;
import com.typesafe.config.Config;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.eagle.app.service.ApplicationEmailService;
import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.common.mail.AlertEmailConstants;
import org.apache.eagle.common.mail.AlertEmailContext;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
import org.apache.eagle.jpm.analyzer.util.Constants;
@@ -31,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.net.URLEncoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -69,19 +66,15 @@ public class EmailPublisher implements Publisher, Serializable {
basic.put("end", analyzerJobEntity.getEndTime() == 0
? "0"
: DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getEndTime()));
- basic.put("progress", analyzerJobEntity.getProgress() + "%");
+ double progress = analyzerJobEntity.getCurrentState().equalsIgnoreCase(org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString()) ? analyzerJobEntity.getProgress() : 100;
+ basic.put("progress", progress + "%");
basic.put("detail", getJobLink(analyzerJobEntity));
-
- Map<String, Map<String, String>> extend = new HashMap<>();
- Map<String, List<Pair<Result.ResultLevel, String>>> alertMessages = result.getAlertMessages();
- for (String evaluator : alertMessages.keySet()) {
- List<Pair<Result.ResultLevel, String>> messages = alertMessages.get(evaluator);
- extend.put(evaluator, new HashMap<>());
- for (Pair<Result.ResultLevel, String> message : messages) {
+ Map<String, List<Result.ProcessorResult>> extend = result.getAlertMessages();
+ for (String evaluator : extend.keySet()) {
+ for (Result.ProcessorResult message : extend.get(evaluator)) {
LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]",
- analyzerJobEntity.getJobDefId(), message.getRight(), message.getLeft(), evaluator);
- extend.get(evaluator).put(message.getRight(), message.getLeft().toString());
+ analyzerJobEntity.getJobDefId(), message.getMessage(), message.getResultLevel(), evaluator);
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
index 2f42bf9..1f42ef9 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
@@ -17,7 +17,7 @@
package org.apache.eagle.jpm.analyzer.publisher;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
public interface Publisher {
void publish(AnalyzerEntity analyzerJobEntity, Result result);