You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/01/20 03:16:57 UTC

[1/2] eagle git commit: [EAGLE-859] MapReduce job performance suggestion

Repository: eagle
Updated Branches:
  refs/heads/master 015d57788 -> eae6e8f11


http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
index a12f589..a9f5132 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
@@ -17,7 +17,7 @@
 
 package org.apache.eagle.jpm.analyzer.publisher;
 
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 
 import java.util.ArrayList;
@@ -27,14 +27,17 @@ import java.util.Map;
 
 public class Result {
     //for EagleStorePublisher
-    private TaggedLogAPIEntity alertEntity = null;//TODO
+    private Map<String, List<TaggedLogAPIEntity>> alertEntities = new HashMap<>();
     //for EmailPublisher
-    private Map<String, List<Pair<ResultLevel, String>>> alertMessages = new HashMap<>();
+    private Map<String, List<ProcessorResult>> alertMessages = new HashMap<>();
 
     public void addEvaluatorResult(Class<?> type, EvaluatorResult result) {
         Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults();
+        Map<Class<?>, TaggedLogAPIEntity> processorEntities = result.getProcessorEntities();
+
         for (Class<?> processorType : processorResults.keySet()) {
             ProcessorResult processorResult = processorResults.get(processorType);
+
             if (processorResult.resultLevel.equals(ResultLevel.NONE)) {
                 continue;
             }
@@ -42,17 +45,27 @@ public class Result {
             String typeName = type.getName();
             if (!alertMessages.containsKey(typeName)) {
                 alertMessages.put(typeName, new ArrayList<>());
+                alertEntities.put(typeName, new ArrayList<>());
             }
-            alertMessages.get(typeName).add(Pair.of(processorResult.getResultLevel(), processorResult.getMessage()));
+            normalizeResult(processorResult);
+            alertMessages.get(typeName).add(processorResult);
+            alertEntities.get(typeName).add(processorEntities.get(processorType));
+
         }
     }
 
-    public TaggedLogAPIEntity getAlertEntity() {
-        return alertEntity;
+    public Map<String, List<ProcessorResult>> getAlertMessages() {
+        return alertMessages;
+    }
+
+    public Map<String, List<TaggedLogAPIEntity>> getAlertEntities() {
+        return alertEntities;
     }
 
-    public Map<String, List<Pair<ResultLevel, String>>> getAlertMessages() {
-        return alertMessages;
+    private void normalizeResult(ProcessorResult processorResult) {
+        if (processorResult.getSettings() != null && !processorResult.getSettings().isEmpty()) {
+            processorResult.setSettingList(StringUtils.join(processorResult.getSettings(), "\n"));
+        }
     }
 
     /**
@@ -61,18 +74,52 @@ public class Result {
 
     public enum ResultLevel {
         NONE,
+        INFO,
         NOTICE,
         WARNING,
         CRITICAL
     }
 
+    public enum RuleType {
+        COMPRESS,
+        SPLIT,
+        SPILL,
+        TASK_NUMBER,
+        GC_TIME,
+        RESOURCE_CONTENTION,
+        DATA_SKEW,
+
+        LONG_STUCK_JOB,
+        LONG_DURATION_JOB
+    }
+
     public static class ProcessorResult {
+        private RuleType ruleType;
         private ResultLevel resultLevel;
         private String message;
+        private List<String> settings;
+        private String settingList;
 
-        public ProcessorResult(ResultLevel resultLevel, String message) {
+        public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message, List<String> settings) {
+            this.ruleType = ruleType;
             this.resultLevel = resultLevel;
             this.message = message;
+            this.settings = settings;
+        }
+
+        public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message) {
+            this.ruleType = ruleType;
+            this.resultLevel = resultLevel;
+            this.message = message;
+            this.settings = new ArrayList<>();
+        }
+
+        public RuleType getRuleType() {
+            return ruleType;
+        }
+
+        public void setRuleType(RuleType ruleType) {
+            this.ruleType = ruleType;
         }
 
         public ResultLevel getResultLevel() {
@@ -90,6 +137,22 @@ public class Result {
         public void setMessage(String message) {
             this.message = message;
         }
+
+        public List<String> getSettings() {
+            return settings;
+        }
+
+        public void setSettings(List<String> settings) {
+            this.settings = settings;
+        }
+
+        public String getSettingList() {
+            return settingList;
+        }
+
+        public void setSettingList(String settingList) {
+            this.settingList = settingList;
+        }
     }
 
     /**
@@ -97,13 +160,22 @@ public class Result {
      */
     public static class EvaluatorResult {
         private Map<Class<?>, ProcessorResult> processorResults = new HashMap<>();
+        private Map<Class<?>, TaggedLogAPIEntity> processorEntities = new HashMap<>();
 
         public void addProcessorResult(Class<?> type, ProcessorResult result) {
             this.processorResults.put(type, result);
         }
 
+        public void addProcessorEntity(Class<?> type, TaggedLogAPIEntity entity) {
+            this.processorEntities.put(type, entity);
+        }
+
         public Map<Class<?>, ProcessorResult> getProcessorResults() {
             return this.processorResults;
         }
+
+        public Map<Class<?>, TaggedLogAPIEntity> getProcessorEntities() {
+            return processorEntities;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
index 4b18f7c..6a51a76 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
@@ -17,7 +17,7 @@
 
 package org.apache.eagle.jpm.analyzer.publisher.dedup;
 
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 
 public interface AlertDeduplicator {

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
index 09f1af6..b139b3c 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
@@ -17,7 +17,7 @@
 
 package org.apache.eagle.jpm.analyzer.publisher.dedup.impl;
 
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
 import org.apache.eagle.jpm.analyzer.util.Constants;

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
index 774e6d2..4c6661a 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
@@ -62,4 +62,5 @@ public class Constants {
 
     public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic";
     public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend";
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
index 39cec68..996adba 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
@@ -115,13 +115,17 @@
 <table class="body-wrap" style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; width: 100%; background-color: #f6f6f6; margin: 0;" bgcolor="#f6f6f6" border="1">
     <caption><b>Analysis By $evaluator</b></caption>
     <tr>
-        <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></td>
+        <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>type</b></th>
         <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="250"><b>message</b></th>
+		<th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="250"><b>optimizer setting</b></th>
+		<th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></th>
     </tr>
-    #foreach($message in ${elem["extend"].get($evaluator).keySet()})
+    #foreach($result in ${elem["extend"].get($evaluator)})
         <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
-            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;"><b>${elem["extend"].get($evaluator).get($message)}</b></td>
-            <th style="...">$message</th>
+			<td style="...">${result.ruleType}</td>
+            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">${result.message}</td>
+			<td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">${result.settingList}</td>
+			<td style="...">${result.resultLevel}</td>
         </tr>
     #end
 </table>

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
index cbbdad3..8c65adf 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java
@@ -38,5 +38,6 @@ public class JPAEntityRepository extends EntityRepository {
         entitySet.add(JobProcessTimeStampEntity.class);
         entitySet.add(JobCountEntity.class);
         entitySet.add(TaskAttemptErrorCategoryEntity.class);
+        entitySet.add(JobSuggestionAPIEntity.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
new file mode 100644
index 0000000..3863a5d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java
@@ -0,0 +1,63 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.historyentity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+import java.util.List;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import static org.apache.eagle.jpm.util.Constants.JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("eaglejpa")
+@ColumnFamily("f")
+@Prefix("jsuggestion")
+@Service(JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME)
+@TimeSeries(true)
+@Partition({"site"})
+@Indexes({
+        @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true),
+        @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false)
+        })
+public class JobSuggestionAPIEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String optimizerSuggestion;
+    @Column("b")
+    private List<String> optimizerSettings;
+
+    public String getOptimizerSuggestion() {
+        return optimizerSuggestion;
+    }
+
+    public void setOptimizerSuggestion(String optimizerSuggestion) {
+        this.optimizerSuggestion = optimizerSuggestion;
+        valueChanged("optimizerSuggestion");
+    }
+
+    public List<String> getOptimizerSettings() {
+        return optimizerSettings;
+    }
+
+    public void setOptimizerSettings(List<String> optimizerSettings) {
+        this.optimizerSettings = optimizerSettings;
+        valueChanged("optimizerSettings");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
index 8db7f5c..46fcf5e 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java
@@ -46,6 +46,40 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
     private String error;
     @Column("f")
     private JobCounters jobCounters;
+    // new added
+    @Column("g")
+    private long shuffleFinishTime;
+    @Column("h")
+    private long sortFinishTime;
+    @Column("i")
+    private long mapFinishTime;
+
+    public long getShuffleFinishTime() {
+        return shuffleFinishTime;
+    }
+
+    public void setShuffleFinishTime(long shuffleFinishTime) {
+        this.shuffleFinishTime = shuffleFinishTime;
+        valueChanged("shuffleFinishTime");
+    }
+
+    public long getSortFinishTime() {
+        return sortFinishTime;
+    }
+
+    public void setSortFinishTime(long sortFinishTime) {
+        this.sortFinishTime = sortFinishTime;
+        valueChanged("sortFinishTime");
+    }
+
+    public long getMapFinishTime() {
+        return mapFinishTime;
+    }
+
+    public void setMapFinishTime(long mapFinishTime) {
+        this.mapFinishTime = mapFinishTime;
+        valueChanged("mapFinishTime");
+    }
 
     public String getTaskStatus() {
         return taskStatus;
@@ -53,7 +87,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setTaskStatus(String taskStatus) {
         this.taskStatus = taskStatus;
-        pcs.firePropertyChange("taskStatus", null, null);
+        valueChanged("taskStatus");
     }
 
     public long getStartTime() {
@@ -62,7 +96,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setStartTime(long startTime) {
         this.startTime = startTime;
-        pcs.firePropertyChange("startTime", null, null);
+        valueChanged("startTime");
     }
 
     public long getEndTime() {
@@ -71,7 +105,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setEndTime(long endTime) {
         this.endTime = endTime;
-        pcs.firePropertyChange("endTime", null, null);
+        valueChanged("endTime");
     }
 
     public long getDuration() {
@@ -80,7 +114,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setDuration(long duration) {
         this.duration = duration;
-        pcs.firePropertyChange("duration", null, null);
+        valueChanged("duration");
     }
 
     public String getError() {
@@ -89,7 +123,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setError(String error) {
         this.error = error;
-        pcs.firePropertyChange("error", null, null);
+        valueChanged("error");
     }
 
     public JobCounters getJobCounters() {
@@ -98,6 +132,6 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity {
 
     public void setJobCounters(JobCounters jobCounters) {
         this.jobCounters = jobCounters;
-        pcs.firePropertyChange("jobCounters", null, null);
+        valueChanged("jobCounters");
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml
index 349c489..2e3a1a8 100644
--- a/eagle-jpm/eagle-jpm-mr-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml
@@ -89,6 +89,11 @@
             <version>1.6</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-analyzer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <resources>

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index 262054e..28ebf4e 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.history.crawler;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.parser.JHFParserBase;
 import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory;
+import org.apache.eagle.jpm.util.MRJobTagName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +47,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback {
         @SuppressWarnings("serial")
         Map<String, String> baseTags = new HashMap<String, String>() {
             {
-                put("site", appConfig.getJobHistoryEndpointConfig().site);
+                put(MRJobTagName.SITE.toString(), appConfig.getJobHistoryEndpointConfig().site);
             }
         };
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
index d89937e..d58eadc 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java
@@ -81,6 +81,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
     private long sumReduceTaskDuration;
 
     private JobCounterMetricsGenerator jobCounterMetricsGenerator;
+    private JobSuggestionListener jobSuggestionListener;
 
     private MRHistoryJobConfig appConfig;
 
@@ -127,6 +128,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
 
         this.appConfig = appConfig;
         this.jobCounterMetricsGenerator = new JobCounterMetricsGenerator(appConfig.getEagleServiceConfig());
+        this.jobSuggestionListener = new JobSuggestionListener(appConfig.getConfig());
+        this.addListener(jobSuggestionListener);
     }
 
     public void register(HistoryJobEntityLifecycleListener lifecycleListener) {
@@ -179,7 +182,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
         this.jobType = jobType;
     }
 
-    protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception {
+    protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters, Object mapCounters, Object reduceCounters) throws Exception {
         String id = values.get(Keys.JOBID);
 
         if (jobId == null) {
@@ -300,8 +303,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             this.jobCounterMetricsGenerator.setBaseTags(jobExecutionEntity.getTags());
 
             formatDiagnostics(values.get(Keys.DIAGNOSTICS));
-
             entityCreated(jobExecutionEntity);
+
+            if (configuration != null && totalCounters != null) {
+                JobCounters parsedTotalCounters = parseCounters(totalCounters);
+                JobCounters parsedMapCounters = parseCounters(mapCounters);
+                JobCounters parsedReduceCounters = parseCounters(reduceCounters);
+                jobSuggestionListener.jobCountersCreated(parsedTotalCounters, parsedMapCounters, parsedReduceCounters);
+                jobSuggestionListener.jobConfigCreated(configuration);
+            }
         }
     }
 
@@ -332,7 +342,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             }
         }
 
-        super.notifiyListeners(entity);
+        super.notifyListeners(entity);
     }
 
     protected abstract JobCounters parseCounters(Object value) throws IOException;
@@ -432,7 +442,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             // it is very likely that an attempt ID could be both succeeded and failed due to M/R system
             // in this case, we should ignore this attempt?
             if (taskAttemptStartTime.get(taskAttemptID) == null) {
-                LOG.warn("task attemp has consistency issue " + taskAttemptID);
+                LOG.warn("task attempt has consistency issue " + taskAttemptID);
                 return;
             }
             entity.setStartTime(taskAttemptStartTime.get(taskAttemptID));
@@ -441,6 +451,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
             entity.setDuration(entity.getEndTime() - entity.getStartTime());
             entity.setTaskStatus(values.get(Keys.TASK_STATUS));
             entity.setError(values.get(Keys.ERROR));
+            if (values.containsKey(Keys.SHUFFLE_FINISHED)) {
+                entity.setShuffleFinishTime(Long.valueOf(values.get(Keys.SHUFFLE_FINISHED)));
+            }
+            if (values.containsKey(Keys.SORT_FINISHED)) {
+                entity.setSortFinishTime(Long.valueOf(values.get(Keys.SORT_FINISHED)));
+            }
+            if (values.containsKey(Keys.MAP_FINISH_TIME)) {
+                entity.setMapFinishTime(Long.valueOf(values.get(Keys.MAP_FINISH_TIME)));
+            }
             if (values.get(Keys.COUNTERS) != null || counters != null) {  // when task is killed, COUNTERS does not exist
                 //entity.setJobCounters(parseCounters(values.get(Keys.COUNTERS)));
                 entity.setJobCounters(parseCounters(counters));
@@ -473,7 +492,6 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl
                 taskAttemptErrorCategoryEntity.setTimestamp(entity.getTimestamp());
                 entityCreated(taskAttemptErrorCategoryEntity);
             }
-
             taskAttemptStartTime.remove(taskAttemptID);
         } else {
             // silently ignore

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
index adeb41e..615e9ad 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
+@Deprecated
 public enum JHFFormat {
     MRVer1,
     MRVer2

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
index 8184f90..1903fc9 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java
@@ -185,7 +185,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
         if (js.getJobQueueName() != null) {
             values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString());
         }
-        handleJob(wrapper.getType(), values, null);
+        handleJob(wrapper.getType(), values, null, null, null);
     }
 
     private void handleJobInited(Event wrapper) throws Exception {
@@ -209,7 +209,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
         if (js.getUberized() != null) {
             values.put(Keys.UBERISED, js.getUberized().toString());
         }
-        handleJob(wrapper.getType(), values, null);
+        handleJob(wrapper.getType(), values, null, null, null);
     }
 
     private void handleJobFinished(Event wrapper) throws Exception {
@@ -234,7 +234,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
             values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString());
         }
         values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCEEDED.name());
-        handleJob(wrapper.getType(), values, js.getTotalCounters());
+        handleJob(wrapper.getType(), values, js.getTotalCounters(), js.getMapCounters(), js.getReduceCounters());
     }
 
     private void handleJobUnsuccessfulCompletion(Event wrapper) throws Exception {
@@ -258,7 +258,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
         if (js.getDiagnostics() != null) {
             values.put(Keys.DIAGNOSTICS, js.getDiagnostics().toString());
         }
-        handleJob(wrapper.getType(), values, null);
+        handleJob(wrapper.getType(), values, null, null, null);
     }
 
     private void handleTaskStarted(Event wrapper) throws Exception {
@@ -539,6 +539,9 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase {
     }
 
     protected JobCounters parseCounters(Object value) throws IOException {
+        if (value == null) {
+            return null;
+        }
         JobCounters jc = new JobCounters();
         Map<String, Map<String, Long>> groups = new HashMap<>();
         JhCounters counters = (JhCounters) value;

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index ca49d9c..1d17640 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -38,11 +38,13 @@ public class JHFParserFactory {
         MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig();
 
         JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, appConfig);
+        // add HistoryJobEntityCreationListener
         reader2.addListener(new JobEntityCreationEagleServiceListener(appConfig));
         reader2.addListener(new TaskFailureListener(eagleServiceConfig));
         reader2.addListener(new TaskAttemptCounterListener(eagleServiceConfig));
         reader2.addListener(new JobConfigurationCreationServiceListener(eagleServiceConfig));
 
+        // add HistoryJobEntityLifecycleListener
         reader2.register(new JobEntityLifecycleAggregator());
         JHFParserBase parser = new JHFMRVer2Parser(reader2);
         return parser;

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
index a80462d..f2730fd 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java
@@ -29,7 +29,7 @@ public class JobEntityCreationPublisher {
         listeners.add(l);
     }
 
-    public void notifiyListeners(JobBaseAPIEntity entity) throws Exception {
+    public void notifyListeners(JobBaseAPIEntity entity) throws Exception {
         for (HistoryJobEntityCreationListener l : listeners) {
             l.jobEntityCreated(entity);
         }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
new file mode 100644
index 0000000..e5b0d2e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.mr.history.parser;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
+import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.MRJobTagName;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.jpm.mr.history.parser.JHFEventReaderBase.Keys;
+import org.apache.hadoop.conf.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ATTEMPT_ID;
+import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ID;
+
+/*
+ * JobEventCounterListener provides an interface to add job/task counter analyzers
+ */
+public class JobSuggestionListener implements HistoryJobEntityCreationListener {
+    private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionListener.class);
+
+    private MapReduceAnalyzerEntity info;
+    private MRJobPerformanceAnalyzer<MapReduceAnalyzerEntity> analyzer;
+
+    public JobSuggestionListener(Config config) {
+        this.info = new MapReduceAnalyzerEntity();
+        this.analyzer = new MRJobPerformanceAnalyzer<>(config);
+    }
+
+    @Override
+    public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception {
+        if (entity instanceof TaskExecutionAPIEntity) {
+            info.getTasksMap().put(entity.getTags().get(TASK_ID.toString()), (TaskExecutionAPIEntity) entity);
+        } else if (entity instanceof TaskAttemptExecutionAPIEntity) {
+            info.getCompletedTaskAttemptsMap().put(entity.getTags().get(TASK_ATTEMPT_ID.toString()), (TaskAttemptExecutionAPIEntity) entity);
+        } else if (entity instanceof JobExecutionAPIEntity) {
+            JobExecutionAPIEntity jobExecutionAPIEntity = (JobExecutionAPIEntity) entity;
+            info.setCurrentState(jobExecutionAPIEntity.getCurrentState());
+            info.setStartTime(jobExecutionAPIEntity.getStartTime());
+            info.setEndTime(jobExecutionAPIEntity.getEndTime());
+            info.setDurationTime(jobExecutionAPIEntity.getDurationTime());
+            info.setUserId(jobExecutionAPIEntity.getTags().get(MRJobTagName.USER.toString()));
+            info.setJobId(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_ID.toString()));
+            info.setJobDefId(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOD_DEF_ID.toString()));
+            info.setSiteId(jobExecutionAPIEntity.getTags().get(MRJobTagName.SITE.toString()));
+            info.setJobName(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_NAME.toString())) ;
+            info.setJobQueueName(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_QUEUE.toString()));
+            info.setJobType(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_TYPE.toString()));
+            info.setFinishedMaps(jobExecutionAPIEntity.getNumFinishedMaps());
+            info.setFinishedReduces(jobExecutionAPIEntity.getNumFinishedReduces());
+            info.setFailedReduces(jobExecutionAPIEntity.getNumFailedReduces());
+            info.setFailedMaps(jobExecutionAPIEntity.getNumFailedMaps());
+            info.setTotalMaps(jobExecutionAPIEntity.getNumTotalMaps());
+            info.setTotalReduces(jobExecutionAPIEntity.getNumTotalReduces());
+        }
+    }
+
+    public void jobConfigCreated(Configuration configuration) {
+        info.setJobConf(configuration);
+    }
+
+    public void jobCountersCreated(JobCounters totalCounters, JobCounters mapCounters, JobCounters reduceCounters) {
+        info.setTotalCounters(totalCounters);
+        info.setReduceCounters(reduceCounters);
+        info.setMapCounters(mapCounters);
+    }
+
+    @Override
+    public void flush() throws Exception {
+        analyzer.analyze(info);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index 856f051..24c734d 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.parser;
 
-import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptCounterAPIEntity;
 import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index e8d5311..3836e3a 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -49,7 +49,7 @@
 
   "service": {
     "host": "sandbox.hortonworks.com",
-    "port": 9099,
+    "port": 9090,
     "username": "admin",
     "password": "secret",
     "readTimeOutSeconds" : 10,

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 120303e..6b33d31 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -19,7 +19,7 @@
 package org.apache.eagle.jpm.mr.running.parser;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
 import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
@@ -173,7 +173,7 @@ public class MRJobParser implements Runnable {
                     break;
                 }
             }
-            mrJobPerformanceAnalyzer.analysis(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
+            mrJobPerformanceAnalyzer.analyze(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
index 0ba6521..4ee58a1 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -117,6 +117,7 @@ public class Constants {
     public static final String JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME = "RunningTaskExecutionService";
     public static final String JPA_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService";
     public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService";
+    public static final String JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME = "JobOptimizerSuggestionService";
 
     public static final String JOB_TASK_TYPE_TAG = "taskType";
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
index 8def44f..bbb80cd 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java
@@ -48,7 +48,11 @@ public final class JobCounters implements Serializable {
     }
 
     public Long getCounterValue(CounterName counterName) {
-        return counters.get(counterName.group.name).get(counterName.name);
+        if (counters.get(counterName.group.name).containsKey(counterName.name)) {
+            return counters.get(counterName.group.name).get(counterName.name);
+        } else {
+            return 0L;
+        }
     }
 
     public static enum GroupName {


[2/2] eagle git commit: [EAGLE-859] MapReduce job performance suggestion

Posted by qi...@apache.org.
[EAGLE-859] MapReduce job performance suggestion

https://issues.apache.org/jira/browse/EAGLE-859

Author: Zhao, Qingwen <qi...@apache.org>
Author: Qingwen Zhao <qi...@gmail.com>

Closes #784 from qingwen220/EAGLE-859.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/eae6e8f1
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/eae6e8f1
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/eae6e8f1

Branch: refs/heads/master
Commit: eae6e8f11367c397c3a9ce39f2c59cf341f33847
Parents: 015d577
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri Jan 20 11:14:56 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Fri Jan 20 11:14:56 2017 +0800

----------------------------------------------------------------------
 eagle-jpm/eagle-jpm-analyzer/pom.xml            |   5 +
 .../eagle/jpm/analyzer/AnalyzerEntity.java      | 130 ------------
 .../apache/eagle/jpm/analyzer/Evaluator.java    |   5 +-
 .../apache/eagle/jpm/analyzer/JobAnalyzer.java  |  10 +-
 .../apache/eagle/jpm/analyzer/Processor.java    |   5 +-
 .../jpm/analyzer/meta/model/AnalyzerEntity.java | 130 ++++++++++++
 .../meta/model/MapReduceAnalyzerEntity.java     | 174 +++++++++++++++
 .../analyzer/mr/MRJobPerformanceAnalyzer.java   |  10 +-
 .../jpm/analyzer/mr/sla/SLAJobEvaluator.java    |   7 +-
 .../sla/processors/LongStuckJobProcessor.java   |   4 +-
 .../UnExpectedLongDurationJobProcessor.java     |   8 +-
 .../mr/suggestion/JobSuggestionEvaluator.java   |  74 ++++++-
 .../MapReduceCompressionSettingProcessor.java   |  82 ++++++++
 .../suggestion/MapReduceDataSkewProcessor.java  |  63 ++++++
 .../mr/suggestion/MapReduceGCTimeProcessor.java |  74 +++++++
 .../MapReduceJobSuggestionContext.java          | 209 +++++++++++++++++++
 .../MapReduceQueueResourceProcessor.java        |  85 ++++++++
 .../mr/suggestion/MapReduceSpillProcessor.java  | 125 +++++++++++
 .../MapReduceSplitSettingProcessor.java         |  47 +++++
 .../suggestion/MapReduceTaskNumProcessor.java   | 194 +++++++++++++++++
 .../analyzer/publisher/EagleStorePublisher.java |  40 +++-
 .../jpm/analyzer/publisher/EmailPublisher.java  |  21 +-
 .../eagle/jpm/analyzer/publisher/Publisher.java |   2 +-
 .../eagle/jpm/analyzer/publisher/Result.java    |  90 +++++++-
 .../publisher/dedup/AlertDeduplicator.java      |   2 +-
 .../dedup/impl/SimpleDeduplicator.java          |   2 +-
 .../eagle/jpm/analyzer/util/Constants.java      |   1 +
 .../main/resources/AnalyzerReportTemplate.vm    |  12 +-
 .../mr/historyentity/JPAEntityRepository.java   |   1 +
 .../historyentity/JobSuggestionAPIEntity.java   |  63 ++++++
 .../TaskAttemptExecutionAPIEntity.java          |  46 +++-
 eagle-jpm/eagle-jpm-mr-history/pom.xml          |   5 +
 .../crawler/DefaultJHFInputStreamCallback.java  |   3 +-
 .../mr/history/parser/JHFEventReaderBase.java   |  28 ++-
 .../eagle/jpm/mr/history/parser/JHFFormat.java  |   1 +
 .../mr/history/parser/JHFMRVer2EventReader.java |  11 +-
 .../jpm/mr/history/parser/JHFParserFactory.java |   2 +
 .../parser/JobEntityCreationPublisher.java      |   2 +-
 .../history/parser/JobSuggestionListener.java   |  94 +++++++++
 .../parser/TaskAttemptCounterListener.java      |   1 -
 .../src/main/resources/application.conf         |   2 +-
 .../jpm/mr/running/parser/MRJobParser.java      |   4 +-
 .../org/apache/eagle/jpm/util/Constants.java    |   1 +
 .../eagle/jpm/util/jobcounter/JobCounters.java  |   6 +-
 44 files changed, 1674 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml
index 50fb803..07f5766 100644
--- a/eagle-jpm/eagle-jpm-analyzer/pom.xml
+++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml
@@ -47,6 +47,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-entity</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-app-base</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
deleted file mode 100644
index f9b7af0..0000000
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.analyzer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * will refactor later if other types of job needs this.
- * AnalyzerEntity for each job needed to be analysised
- */
-public class AnalyzerEntity {
-    private String jobDefId;
-    private String jobId;
-    private String siteId;
-    private String userId;
-
-    private long startTime;
-    private long endTime;
-    private long durationTime;
-    private String currentState;
-    private double progress;
-
-    private Map<String, Object> jobConfig = new HashMap<>();
-
-    private Map<String, Object> jobMeta = new HashMap<>();
-
-    public String getJobDefId() {
-        return jobDefId;
-    }
-
-    public void setJobDefId(String jobDefId) {
-        this.jobDefId = jobDefId;
-    }
-
-    public String getJobId() {
-        return jobId;
-    }
-
-    public void setJobId(String jobId) {
-        this.jobId = jobId;
-    }
-
-    public String getSiteId() {
-        return siteId;
-    }
-
-    public void setSiteId(String siteId) {
-        this.siteId = siteId;
-    }
-
-    public String getUserId() {
-        return userId;
-    }
-
-    public void setUserId(String userId) {
-        this.userId = userId;
-    }
-
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(long startTime) {
-        this.startTime = startTime;
-    }
-
-    public long getEndTime() {
-        return endTime;
-    }
-
-    public void setEndTime(long endTime) {
-        this.endTime = endTime;
-    }
-
-    public long getDurationTime() {
-        return durationTime;
-    }
-
-    public void setDurationTime(long durationTime) {
-        this.durationTime = durationTime;
-    }
-
-    public String getCurrentState() {
-        return currentState;
-    }
-
-    public void setCurrentState(String currentState) {
-        this.currentState = currentState;
-    }
-
-    public Map<String, Object> getJobConfig() {
-        return jobConfig;
-    }
-
-    public void setJobConfig(Map<String, Object> jobConfig) {
-        this.jobConfig = jobConfig;
-    }
-
-    public Map<String, Object> getJobMeta() {
-        return jobMeta;
-    }
-
-    public void setJobMeta(Map<String, Object> jobMeta) {
-        this.jobMeta = jobMeta;
-    }
-
-    public double getProgress() {
-        return progress;
-    }
-
-    public void setProgress(double progress) {
-        this.progress = progress;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
index 6617916..60ee8d6 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
@@ -17,8 +17,9 @@
 
 package org.apache.eagle.jpm.analyzer;
 
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 
-public interface Evaluator {
-    Result.EvaluatorResult evaluate(AnalyzerEntity analyzerEntity);
+public interface Evaluator<T extends AnalyzerEntity> {
+    Result.EvaluatorResult evaluate(T analyzerEntity);
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
index 6cda1cd..1e9c00e 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
@@ -17,12 +17,14 @@
 
 package org.apache.eagle.jpm.analyzer;
 
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
+
 /**
- * Each JobAnalyzer contains one or more Evaluators to analysis each job.
+ * Each JobAnalyzer contains one or more Evaluators to analyze each job.
  * Each Evaluator is a group of Processors
- * Each Processor implements an algorithm or a model to analysis one dimension of a job
+ * Each Processor implements an algorithm or a model to analyze one dimension of a job
  *
  */
-public interface JobAnalyzer {
-    void analysis(AnalyzerEntity analyzerEntity) throws Exception;
+public interface JobAnalyzer<T extends AnalyzerEntity> {
+    void analyze(T analyzerEntity) throws Exception;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
index d5a8a74..419e402 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
@@ -17,8 +17,9 @@
 
 package org.apache.eagle.jpm.analyzer;
 
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 
-public interface Processor {
-    Result.ProcessorResult process(AnalyzerEntity jobAnalysisEntity);
+public interface Processor<T extends AnalyzerEntity> {
+    Result.ProcessorResult process(T jobAnalysisEntity);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
new file mode 100644
index 0000000..189d85d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java
@@ -0,0 +1,130 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * will refactor later if other types of job needs this.
+ * AnalyzerEntity for each job needed to be analyzed
+ */
+public class AnalyzerEntity {
+    private String jobDefId;
+    private String jobId;
+    private String siteId;
+    private String userId;
+
+    private long startTime;
+    private long endTime;
+    private long durationTime;
+    private String currentState;
+    private double progress;
+
+    private Map<String, Object> jobConfig = new HashMap<>();
+
+    private Map<String, Object> jobMeta = new HashMap<>();
+
+    public String getJobDefId() {
+        return jobDefId;
+    }
+
+    public void setJobDefId(String jobDefId) {
+        this.jobDefId = jobDefId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(String jobId) {
+        this.jobId = jobId;
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
+    public String getUserId() {
+        return userId;
+    }
+
+    public void setUserId(String userId) {
+        this.userId = userId;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+    }
+
+    public long getDurationTime() {
+        return durationTime;
+    }
+
+    public void setDurationTime(long durationTime) {
+        this.durationTime = durationTime;
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+    }
+
+    public Map<String, Object> getJobConfig() {
+        return jobConfig;
+    }
+
+    public void setJobConfig(Map<String, Object> jobConfig) {
+        this.jobConfig = jobConfig;
+    }
+
+    public Map<String, Object> getJobMeta() {
+        return jobMeta;
+    }
+
+    public void setJobMeta(Map<String, Object> jobMeta) {
+        this.jobMeta = jobMeta;
+    }
+
+    public double getProgress() {
+        return progress;
+    }
+
+    public void setProgress(double progress) {
+        this.progress = progress;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
new file mode 100644
index 0000000..cd6249d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MapReduceAnalyzerEntity extends AnalyzerEntity {
+    private String jobName;
+    private String jobQueueName;
+    private String jobType;
+    private int totalMaps;
+    private int totalReduces;
+    private int failedMaps;
+    private int failedReduces;
+    private int finishedMaps;
+    private int finishedReduces;
+    private JobCounters totalCounters;
+    private JobCounters mapCounters;
+    private JobCounters reduceCounters;
+    private Map<String, TaskExecutionAPIEntity> tasksMap;
+    private Map<String, TaskAttemptExecutionAPIEntity> completedTaskAttemptsMap;
+    private Configuration jobConf;
+
+    public MapReduceAnalyzerEntity() {
+        this.setEndTime(-1);
+        this.setStartTime(-1);
+        finishedMaps = finishedReduces = 0;
+        jobName = jobQueueName = "";
+        tasksMap = new HashMap<>();
+        completedTaskAttemptsMap = new HashMap<>();
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public String getJobQueueName() {
+        return jobQueueName;
+    }
+
+    public String getJobType() {
+        return jobType;
+    }
+
+    public int getTotalMaps() {
+        return totalMaps;
+    }
+
+    public int getTotalReduces() {
+        return totalReduces;
+    }
+
+    public int getFailedMaps() {
+        return failedMaps;
+    }
+
+    public int getFailedReduces() {
+        return failedReduces;
+    }
+
+    public int getFinishedMaps() {
+        return finishedMaps;
+    }
+
+    public int getFinishedReduces() {
+        return finishedReduces;
+    }
+
+    public JobCounters getTotalCounters() {
+        return totalCounters;
+    }
+
+    public JobCounters getMapCounters() {
+        return mapCounters;
+    }
+
+    public JobCounters getReduceCounters() {
+        return reduceCounters;
+    }
+
+    public Map<String, TaskExecutionAPIEntity> getTasksMap() {
+        return tasksMap;
+    }
+
+    public Map<String, TaskAttemptExecutionAPIEntity> getCompletedTaskAttemptsMap() {
+        return completedTaskAttemptsMap;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public void setJobQueueName(String jobQueueName) {
+        this.jobQueueName = jobQueueName;
+    }
+
+    public void setJobType(String jobType) {
+        this.jobType = jobType;
+    }
+
+    public void setTotalMaps(int totalMaps) {
+        this.totalMaps = totalMaps;
+    }
+
+    public void setTotalReduces(int totalReduces) {
+        this.totalReduces = totalReduces;
+    }
+
+    public void setFailedMaps(int failedMaps) {
+        this.failedMaps = failedMaps;
+    }
+
+    public void setFailedReduces(int failedReduces) {
+        this.failedReduces = failedReduces;
+    }
+
+    public void setFinishedMaps(int finishedMaps) {
+        this.finishedMaps = finishedMaps;
+    }
+
+    public void setFinishedReduces(int finishedReduces) {
+        this.finishedReduces = finishedReduces;
+    }
+
+    public void setTotalCounters(JobCounters totalCounters) {
+        this.totalCounters = totalCounters;
+    }
+
+    public void setMapCounters(JobCounters mapCounters) {
+        this.mapCounters = mapCounters;
+    }
+
+    public void setReduceCounters(JobCounters reduceCounters) {
+        this.reduceCounters = reduceCounters;
+    }
+
+    public void setTasksMap(Map<String, TaskExecutionAPIEntity> tasksMap) {
+        this.tasksMap = tasksMap;
+    }
+
+    public void setCompletedTaskAttemptsMap(Map<String, TaskAttemptExecutionAPIEntity> completedTaskAttemptsMap) {
+        this.completedTaskAttemptsMap = completedTaskAttemptsMap;
+    }
+
+    public Configuration getJobConf() {
+        return jobConf;
+    }
+
+    public void setJobConf(Configuration jobConf) {
+        this.jobConf = jobConf;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
index e0e579a..e32a37c 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
@@ -20,6 +20,7 @@ package org.apache.eagle.jpm.analyzer.mr;
 import com.typesafe.config.Config;
 import org.apache.eagle.jpm.analyzer.*;
 import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
 import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
 import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher;
@@ -33,7 +34,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
+public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAnalyzer<T>, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(MRJobPerformanceAnalyzer.class);
 
     private List<Evaluator> evaluators = new ArrayList<>();
@@ -51,11 +52,14 @@ public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
     }
 
     @Override
-    public void analysis(AnalyzerEntity analyzerJobEntity) throws Exception {
+    public void analyze(T analyzerJobEntity) throws Exception {
         Result result = new Result();
 
         for (Evaluator evaluator : evaluators) {
-            result.addEvaluatorResult(evaluator.getClass(), evaluator.evaluate(analyzerJobEntity));
+            Result.EvaluatorResult evaluatorResult = evaluator.evaluate(analyzerJobEntity);
+            if (evaluatorResult != null) {
+                result.addEvaluatorResult(evaluator.getClass(), evaluatorResult);
+            }
         }
 
         for (Publisher publisher : publishers) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
index f10b68d..a77e55d 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
@@ -18,7 +18,7 @@
 package org.apache.eagle.jpm.analyzer.mr.sla;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.Evaluator;
 import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
 import org.apache.eagle.jpm.analyzer.mr.sla.processors.LongStuckJobProcessor;
@@ -26,6 +26,7 @@ import org.apache.eagle.jpm.analyzer.mr.sla.processors.UnExpectedLongDurationJob
 import org.apache.eagle.jpm.analyzer.Processor;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.apache.eagle.jpm.util.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,10 @@ public class SLAJobEvaluator implements Evaluator, Serializable {
 
     @Override
     public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) {
+        if (!analyzerJobEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) {
+            return null;
+        }
+
         Result.EvaluatorResult result = new Result.EvaluatorResult();
 
         List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId());

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
index 35f3b27..b3322ed 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
@@ -18,7 +18,7 @@
 package org.apache.eagle.jpm.analyzer.mr.sla.processors;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.Processor;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 import org.slf4j.Logger;
@@ -38,6 +38,6 @@ public class LongStuckJobProcessor implements Processor, Serializable {
     @Override
     public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
         LOG.info("Job {} In LongStuckJobProcessor", analyzerJobEntity.getJobDefId());
-        return new Result.ProcessorResult(Result.ResultLevel.NONE, "");
+        return new Result.ProcessorResult(Result.RuleType.LONG_STUCK_JOB, Result.ResultLevel.NONE, "");
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
index 9d4ce2b..8f655ba 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
@@ -18,7 +18,7 @@
 package org.apache.eagle.jpm.analyzer.mr.sla.processors;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
 import org.apache.eagle.jpm.analyzer.Processor;
 import org.apache.eagle.jpm.analyzer.util.Constants;
@@ -50,7 +50,7 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab
         Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta();
         long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData);
         if (avgDurationTime == 0L) {
-            return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+            return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE);
         }
 
         Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD;
@@ -62,12 +62,12 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab
         double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime;
         for (Map.Entry<Result.ResultLevel, Double> entry : sorted) {
             if (expirePercent >= entry.getValue()) {
-                return new Result.ProcessorResult(entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
+                return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
                         (int)(expirePercent * 100), avgDurationTime / 1000));
             }
         }
 
-        return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+        return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE);
     }
 
     private long getAvgDuration(AnalyzerEntity mrJobAnalysisEntity, Map<String, Object> jobMetaData) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
index 79f5318..ea60ff9 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
@@ -18,15 +18,24 @@
 package org.apache.eagle.jpm.analyzer.mr.suggestion;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.mr.historyentity.JobSuggestionAPIEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.Constants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
-public class JobSuggestionEvaluator implements Evaluator, Serializable {
+import static org.apache.eagle.jpm.util.MRJobTagName.*;
+
+public class JobSuggestionEvaluator implements Evaluator<MapReduceAnalyzerEntity>, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionEvaluator.class);
 
     private Config config;
@@ -35,10 +44,63 @@ public class JobSuggestionEvaluator implements Evaluator, Serializable {
         this.config = config;
     }
 
+    private List<Processor> loadProcessors(MapReduceJobSuggestionContext context) {
+        List<Processor> processors = new ArrayList<>();
+        processors.add(new MapReduceCompressionSettingProcessor(context));
+        processors.add(new MapReduceSplitSettingProcessor(context));
+        processors.add(new MapReduceDataSkewProcessor(context));
+        processors.add(new MapReduceGCTimeProcessor(context));
+        processors.add(new MapReduceSpillProcessor(context));
+        processors.add(new MapReduceTaskNumProcessor(context));
+        //processors.add(new MapReduceQueueResourceProcessor(context));
+
+        return processors;
+    }
+
     @Override
-    public Result.EvaluatorResult evaluate(AnalyzerEntity mrJobEntity) {
-        Result.EvaluatorResult result = new Result.EvaluatorResult();
-        //TODO
-        return result;
+    public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity analyzerEntity) {
+        if (analyzerEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) {
+            return null;
+        }
+
+        MapReduceJobSuggestionContext jobContext = new MapReduceJobSuggestionContext(analyzerEntity);
+        if (jobContext.getNumMaps() == 0) {
+            return null;
+        }
+
+        try {
+            Result.EvaluatorResult result = new Result.EvaluatorResult();
+            for (Processor processor : loadProcessors(jobContext)) {
+                Result.ProcessorResult processorResult = processor.process(analyzerEntity);
+                if (processorResult != null) {
+                    result.addProcessorResult(processor.getClass(), processorResult);
+                    result.addProcessorEntity(processor.getClass(), createJobSuggestionEntity(processorResult, analyzerEntity));
+                }
+            }
+            return result;
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            return null;
+        }
+
     }
+
+    private static JobSuggestionAPIEntity createJobSuggestionEntity(Result.ProcessorResult processorResult, MapReduceAnalyzerEntity entity) {
+        Map<String, String> tags = new HashMap<>();
+        tags.put(JOB_ID.toString(), entity.getJobId());
+        tags.put(JOD_DEF_ID.toString(), entity.getJobDefId());
+        tags.put(SITE.toString(), entity.getSiteId());
+        tags.put(USER.toString(), entity.getUserId());
+        tags.put(RULE_TYPE.toString(), processorResult.getRuleType().toString());
+        tags.put(JOB_QUEUE.toString(), entity.getJobQueueName());
+        tags.put(JOB_TYPE.toString(), entity.getJobType());
+        JobSuggestionAPIEntity jobSuggestionAPIEntity = new JobSuggestionAPIEntity();
+        jobSuggestionAPIEntity.setTags(tags);
+        jobSuggestionAPIEntity.setTimestamp(entity.getStartTime());  // startTime as the job timestamp
+        jobSuggestionAPIEntity.setOptimizerSuggestion(processorResult.getMessage());
+        jobSuggestionAPIEntity.setOptimizerSettings(processorResult.getSettings());
+
+        return jobSuggestionAPIEntity;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java
new file mode 100644
index 0000000..62c5c2b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_OUTPUT_COMPRESS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC;
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES;
+import static org.apache.hadoop.mapreduce.MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR;
+
+public class MapReduceCompressionSettingProcessor implements Processor<MapReduceAnalyzerEntity> {
+
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceCompressionSettingProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+        List<String> optSettings = new ArrayList<>();
+
+        JobConf jobconf = new JobConf(context.getJobconf());
+        if (jobconf.getLong(NUM_REDUCES, 0) > 0) {
+            if (!jobconf.getCompressMapOutput()) {
+                optSettings.add(String.format("%s=true", MAP_OUTPUT_COMPRESS));
+                sb.append("Please set " + MAP_OUTPUT_COMPRESS + " to true to reduce network IO.\n");
+            } else {
+                String codecClassName = jobconf.get(MAP_OUTPUT_COMPRESS_CODEC);
+                if (!(codecClassName.endsWith("LzoCodec") || codecClassName.endsWith("SnappyCodec"))) {
+                    optSettings.add(String.format("%s=LzoCodec or SnappyCodec", MAP_OUTPUT_COMPRESS_CODEC));
+                    sb.append("Best practice: use LzoCodec or SnappyCodec for " + MAP_OUTPUT_COMPRESS_CODEC).append("\n");
+                }
+            }
+        }
+
+        if (!jobconf.getBoolean(FileOutputFormat.COMPRESS, false)) {
+            optSettings.add(String.format("%s=true", FileOutputFormat.COMPRESS));
+            sb.append("Please set " + FileOutputFormat.COMPRESS + " to true to reduce disk usage and network IO.\n");
+        } else {
+            String codecName = jobconf.get(FileOutputFormat.COMPRESS_CODEC, "");
+            String outputFileFormat = jobconf.get(OUTPUT_FORMAT_CLASS_ATTR, "");
+
+            if ((codecName.endsWith("GzipCodec") || codecName.endsWith("SnappyCodec") || codecName.endsWith("DefaultCodec"))
+                && outputFileFormat.endsWith("TextOutputFormat")) {
+                sb.append("Best practice: don't use Gzip/Snappy/DefaultCodec with TextOutputFormat");
+                sb.append(" as this will cause the output files to be unsplittable. ");
+                sb.append("Please use LZO instead or ");
+                sb.append("use a container file format such as SequenceFileOutputFormat.\n");
+            }
+        }
+
+        if (sb.length() > 0) {
+            return new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.INFO, sb.toString(), optSettings);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java
new file mode 100644
index 0000000..b21a927
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+public class MapReduceDataSkewProcessor implements Processor<MapReduceAnalyzerEntity> {
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceDataSkewProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+    
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        TaskAttemptExecutionAPIEntity worstReduce = context.getWorstReduce();
+        if (context.getNumReduces() == 0 || worstReduce == null) {
+            return null;
+        }
+        StringBuilder sb = new StringBuilder();
+        try {
+            long worstTimeInSec = (worstReduce.getEndTime() - worstReduce.getShuffleFinishTime()) / DateTimeUtil.ONESECOND;
+            if (worstTimeInSec - context.getAvgReduceTimeInSec() > 30 * 60 ) {
+                long avgInputs = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS)
+                    / context.getNumReduces();
+                long worstInputs = worstReduce.getJobCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS);
+
+                if (worstInputs > avgInputs * 5) {
+                    sb.append("Data skew detected in reducers. The average reduce time is ").append(context.getAvgReduceTimeInSec());
+                    sb.append(" seconds, the worst reduce time is ").append(worstTimeInSec);
+                    sb.append(" seconds. Please investigate this problem to improve your job performance.\n");
+                }
+            }
+
+            if (sb.length() > 0) {
+                return new Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.INFO, sb.toString());
+            }
+        } catch (NullPointerException e) {
+            // When job failed there may not have counters, so just ignore it
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java
new file mode 100644
index 0000000..103de7a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_JAVA_OPTS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_JAVA_OPTS;
+
+public class MapReduceGCTimeProcessor implements Processor<MapReduceAnalyzerEntity> {
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceGCTimeProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+        List<String> optSettings = new ArrayList<>();
+        String setting;
+
+        try {
+            long mapGCTime = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.GC_MILLISECONDS);
+            long mapCPUTime = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.CPU_MILLISECONDS);
+
+            if (mapGCTime > mapCPUTime * 0.1) {
+                setting = String.format("-D%s", MAP_JAVA_OPTS);
+                optSettings.add(setting);
+                sb.append("Map GC_TIME_MILLIS took too long. Please increase mapper memory via ").append(setting);
+                sb.append(", or optimize your mapper class.\n");
+            }
+
+            if (context.getNumReduces() > 0) {
+                long reduceGCTime = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.GC_MILLISECONDS);
+                long reduceCPUTime = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.CPU_MILLISECONDS);
+                if (reduceGCTime > reduceCPUTime * 0.1) {
+                    setting = String.format("-D%s", REDUCE_JAVA_OPTS);
+                    optSettings.add(setting);
+                    sb.append("Reduce GC_TIME_MILLIS took too long. Please increase memory for reduce via ").append(setting);
+                    sb.append(", or optimize your reducer class.\n");
+                }
+            }
+
+            if (sb.length() > 0) {
+                return new Result.ProcessorResult(Result.RuleType.GC_TIME, Result.ResultLevel.INFO, sb.toString(), optSettings);
+            }
+        } catch (NullPointerException e) {
+            // When job failed there may not have counters, so just ignore it
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java
new file mode 100644
index 0000000..1f4e548
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.MRJobTagName;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskType;
+
+import java.util.regex.Pattern;
+
+import static org.apache.eagle.jpm.util.jobcounter.JobCounters.CounterName.MAP_OUTPUT_BYTES;
+import static org.apache.eagle.jpm.util.jobcounter.JobCounters.CounterName.MAP_OUTPUT_RECORDS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_MAPS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES;
+
+public class MapReduceJobSuggestionContext {
+
+    private JobConf jobconf;
+    private MapReduceAnalyzerEntity job;
+
+    private long numMaps;
+    private long numReduces;
+
+    private long avgMapTimeInSec;
+    private long avgReduceTimeInSec;
+    private long avgShuffleTimeInSec;
+    private TaskAttemptExecutionAPIEntity worstMap;
+    private TaskAttemptExecutionAPIEntity worstReduce;
+    private TaskAttemptExecutionAPIEntity worstShuffle;
+    private TaskAttemptExecutionAPIEntity lastMap;
+    private TaskAttemptExecutionAPIEntity lastReduce;
+    private TaskAttemptExecutionAPIEntity lastShuffle;
+    private TaskAttemptExecutionAPIEntity firstMap;
+    private TaskAttemptExecutionAPIEntity firstReduce;
+    private TaskAttemptExecutionAPIEntity firstShuffle;
+
+    private long minMapSpillMemBytes;
+
+    public static final Pattern MAX_HEAP_PATTERN = Pattern.compile("-Xmx([0-9]+)([kKmMgG]?)");
+
+    public MapReduceJobSuggestionContext(MapReduceAnalyzerEntity job) {
+        this.job = job;
+        this.jobconf = new JobConf(job.getJobConf());
+        buildContext();
+    }
+
+    private MapReduceJobSuggestionContext buildContext() {
+        avgMapTimeInSec = avgReduceTimeInSec = avgShuffleTimeInSec = 0;
+        numMaps = jobconf.getLong(NUM_MAPS, 0);
+        numReduces = jobconf.getLong(NUM_REDUCES, 0);
+
+        for (TaskAttemptExecutionAPIEntity attempt : job.getCompletedTaskAttemptsMap().values()) {
+            String taskType = getTaskType(attempt);
+            if (Constants.TaskType.MAP.toString().equalsIgnoreCase(taskType)) {
+                long mapTime = attempt.getEndTime() - attempt.getStartTime();
+                avgMapTimeInSec += mapTime;
+                if (firstMap == null || firstMap.getStartTime() > attempt.getStartTime()) {
+                    firstMap = attempt;
+                }
+                if (lastMap == null || lastMap.getEndTime() < attempt.getEndTime()) {
+                    lastMap = attempt;
+                }
+                if (worstMap == null || (worstMap.getEndTime() - worstMap.getStartTime()) < mapTime) {
+                    worstMap = attempt;
+                }
+                long tmpMem = getMinimumIOSortMemory(attempt);
+                if (tmpMem > minMapSpillMemBytes) {
+                    minMapSpillMemBytes = tmpMem;
+                }
+            } else if (TaskType.REDUCE.toString().equalsIgnoreCase(taskType)) {
+                long shuffleTime = attempt.getShuffleFinishTime() - attempt.getStartTime();
+                avgShuffleTimeInSec += shuffleTime;
+                if (firstShuffle == null || firstShuffle.getStartTime() > attempt.getStartTime()) {
+                    firstShuffle = attempt;
+                }
+                if (lastShuffle == null || lastShuffle.getShuffleFinishTime() < attempt.getShuffleFinishTime()) {
+                    lastShuffle = attempt;
+                }
+                if (worstShuffle == null || (worstShuffle.getShuffleFinishTime() - worstShuffle.getStartTime()) < shuffleTime) {
+                    worstShuffle = attempt;
+                }
+
+                long reduceTime = attempt.getEndTime() - attempt.getShuffleFinishTime();
+                avgReduceTimeInSec += reduceTime;
+                if (firstReduce == null || firstReduce.getStartTime() > attempt.getStartTime()) {
+                    firstReduce = attempt;
+                }
+                if (lastReduce == null || lastReduce.getEndTime() < attempt.getEndTime()) {
+                    lastReduce = attempt;
+                }
+                if (worstReduce == null || (worstReduce.getEndTime() - worstReduce.getShuffleFinishTime()) < reduceTime) {
+                    worstReduce = attempt;
+                }
+            }
+        }
+        if (numMaps > 0) {
+            avgMapTimeInSec = avgMapTimeInSec / numMaps / DateTimeUtil.ONESECOND;
+        }
+        if (numReduces > 0) {
+            avgReduceTimeInSec = avgReduceTimeInSec / numReduces / DateTimeUtil.ONESECOND;
+            avgShuffleTimeInSec = avgShuffleTimeInSec / numReduces / DateTimeUtil.ONESECOND;
+        }
+        return this;
+    }
+
+    private String getTaskType(TaskAttemptExecutionAPIEntity taskAttemptInfo) {
+        return taskAttemptInfo.getTags().get(MRJobTagName.TASK_TYPE.toString());
+    }
+
+    /**
+     * The default index size is 16.
+     *
+     * @param attempt
+     * @return minimal sort memory
+     */
+    private long getMinimumIOSortMemory(TaskAttemptExecutionAPIEntity attempt) {
+        long records = attempt.getJobCounters().getCounterValue(MAP_OUTPUT_RECORDS);
+        long outputBytes = attempt.getJobCounters().getCounterValue(MAP_OUTPUT_BYTES);
+        return outputBytes + records * 16;
+    }
+
+    public JobConf getJobconf() {
+        return jobconf;
+    }
+
+    public MapReduceAnalyzerEntity getJob() {
+        return job;
+    }
+
+    public long getNumMaps() {
+        return numMaps;
+    }
+
+    public long getNumReduces() {
+        return numReduces;
+    }
+
+    public long getAvgMapTimeInSec() {
+        return avgMapTimeInSec;
+    }
+
+    public long getAvgReduceTimeInSec() {
+        return avgReduceTimeInSec;
+    }
+
+    public long getAvgShuffleTimeInSec() {
+        return avgShuffleTimeInSec;
+    }
+
+    public TaskAttemptExecutionAPIEntity getWorstMap() {
+        return worstMap;
+    }
+
+    public TaskAttemptExecutionAPIEntity getWorstReduce() {
+        return worstReduce;
+    }
+
+    public TaskAttemptExecutionAPIEntity getWorstShuffle() {
+        return worstShuffle;
+    }
+
+    public TaskAttemptExecutionAPIEntity getLastMap() {
+        return lastMap;
+    }
+
+    public TaskAttemptExecutionAPIEntity getLastReduce() {
+        return lastReduce;
+    }
+
+    public TaskAttemptExecutionAPIEntity getLastShuffle() {
+        return lastShuffle;
+    }
+
+    public TaskAttemptExecutionAPIEntity getFirstMap() {
+        return firstMap;
+    }
+
+    public TaskAttemptExecutionAPIEntity getFirstReduce() {
+        return firstReduce;
+    }
+
+    public TaskAttemptExecutionAPIEntity getFirstShuffle() {
+        return firstShuffle;
+    }
+
+    public long getMinMapSpillMemBytes() {
+        return minMapSpillMemBytes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
new file mode 100644
index 0000000..a1b57bf
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+
+/*
+ * Criterion: (TimeElapsed / (numTasks / 500 * avgTaskTime)) > 20
+ */
+public class MapReduceQueueResourceProcessor implements Processor<MapReduceAnalyzerEntity> {
+    private static final Logger LOG = LoggerFactory.getLogger(MapReduceQueueResourceProcessor.class);
+
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceQueueResourceProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        try {
+            String userName = context.getJob().getUserId();
+            TaskAttemptExecutionAPIEntity lastMap = context.getLastMap();
+            TaskAttemptExecutionAPIEntity firstMap = context.getFirstMap();
+            TaskAttemptExecutionAPIEntity lastReduce = context.getLastReduce();
+            TaskAttemptExecutionAPIEntity firstShuffle = context.getFirstShuffle();
+
+            if (checkBatchUser(userName) && lastMap != null && firstMap != null) {
+                StringBuilder sb = new StringBuilder();
+
+                long tasksPerTime = 500; // better get it from RM
+                long mapPhaseTimeInSec = (lastMap.getEndTime() - firstMap.getStartTime()) / DateTimeUtil.ONESECOND;
+                if (mapPhaseTimeInSec > context.getAvgMapTimeInSec()
+                    * ((context.getNumMaps() + tasksPerTime - 1) / tasksPerTime) * 20) {
+                    sb.append("There appears to have been resource contention during the map phase of your job. Please ask for more resources if your job is SLA-bound,");
+                    sb.append(" or submit your job when the cluster is less busy.\n");
+                }
+
+                if (context.getNumReduces() > 0 && lastReduce != null && firstShuffle != null) {
+                    long reducePhaseTimeInSec = (lastReduce.getEndTime() - firstShuffle.getStartTime()) / DateTimeUtil.ONESECOND;
+                    if (reducePhaseTimeInSec > context.getAvgReduceTimeInSec()
+                        * ((context.getNumReduces() + tasksPerTime - 1) / tasksPerTime) * 20) {
+                        sb.append("Seems there was resource contention when your job in reduce phase, please ask for more resource if your job is SLA enabled,");
+                        sb.append(" or submit your job when the cluster is less busy.\n");
+                    }
+                }
+
+                if (sb.length() > 0) {
+                    return new Result.ProcessorResult(Result.RuleType.RESOURCE_CONTENTION, Result.ResultLevel.INFO, sb.toString());
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn(e.getMessage(), e);
+        }
+        return null;
+    }
+
+    protected boolean checkBatchUser(String userName) {
+        return userName.startsWith("b_");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java
new file mode 100644
index 0000000..835b382
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+import static org.apache.eagle.jpm.analyzer.mr.suggestion.MapReduceJobSuggestionContext.MAX_HEAP_PATTERN;
+import static org.apache.hadoop.mapreduce.MRJobConfig.IO_SORT_MB;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_JAVA_OPTS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_SORT_SPILL_PERCENT;
+
+/**
+ * Check whether spilled more than once, if true, find out the minimum value of the memory to hold all the data,
+ * based on that value, find out how much memory need for heap size.
+ */
+public class MapReduceSpillProcessor implements Processor<MapReduceAnalyzerEntity> {
+
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceSpillProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+        List<String> optSettings = new ArrayList<>();
+        String setting;
+
+        long outputRecords = 0L; // Map output records
+        long spillRecords = 0L; //  Spilled Records
+        try {
+            outputRecords = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.MAP_OUTPUT_RECORDS);
+            spillRecords = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.SPILLED_RECORDS);
+
+            if (outputRecords < spillRecords) {
+                sb.append("Total map output records: ").append(outputRecords);
+                sb.append(" Total map spilled records: ").append(spillRecords).append(". Please set");
+
+                long minMapSpillMemBytes = context.getMinMapSpillMemBytes();
+                double spillPercent = context.getJobconf().getDouble(MAP_SORT_SPILL_PERCENT, 0.8);
+                if (minMapSpillMemBytes > 512 * FileUtils.ONE_MB * spillPercent) {
+                    if (Math.abs(1.0 - spillPercent) > 0.001) {
+                        setting = String.format("-D%s=1", MAP_SORT_SPILL_PERCENT);
+                        sb.append(" ").append(setting);
+                        optSettings.add(setting);
+                    }
+                } else {
+                    minMapSpillMemBytes /= spillPercent;
+                }
+
+                long minMapSpillMemMB = (minMapSpillMemBytes / FileUtils.ONE_MB + 10) / 10 * 10;
+                if (minMapSpillMemMB >= 2047 ) {
+                    sb.append("\nPlease reduce the block size of the input files and make sure they are splittable.");
+                } else {
+                    setting = String.format("-D%s=%s", IO_SORT_MB, minMapSpillMemMB);
+                    sb.append(" ").append(setting);
+                    optSettings.add(setting);
+                    long heapSize = getMaxHeapSize(context.getJobconf().get(MAP_JAVA_OPTS));
+                    if (heapSize < 3 * minMapSpillMemMB) {
+                        long expectedHeapSizeMB = (minMapSpillMemMB * 3 + 1024) / 1024 * 1024;
+                        setting = String.format(" -D%s=-Xmx%sM", MAP_JAVA_OPTS, expectedHeapSizeMB);
+                        sb.append(" ").append(setting);
+                        optSettings.add(setting);
+                    }
+                }
+                sb.append(" to avoid spilled records.\n");
+            }
+
+
+            long reduceInputRecords = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS);
+            spillRecords = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.SPILLED_RECORDS);
+            if (reduceInputRecords < spillRecords) {
+                sb.append("Please add more memory (mapreduce.reduce.java.opts) to avoid spilled records.");
+                sb.append(" Total Reduce input records: ").append(reduceInputRecords);
+                sb.append(" Total Spilled Records: ").append(spillRecords);
+                sb.append("\n");
+            }
+
+            if (sb.length() > 0) {
+                return new Result.ProcessorResult(Result.RuleType.SPILL, Result.ResultLevel.INFO, sb.toString(), optSettings);
+            }
+        } catch (NullPointerException e) {
+            //When job failed there may not have counters, so just ignore it
+        }
+        return null;
+    }
+
+    private static long getMaxHeapSize(String s) {
+        Matcher m = MAX_HEAP_PATTERN.matcher(s);
+        long val = 0;
+        if (m.find()) {
+            val = Long.parseLong(m.group(1));
+            if ("k".equalsIgnoreCase(m.group(2))) {
+                val /= 1024;
+            } else if ("g".equalsIgnoreCase(m.group(2))) {
+                val *= 1024;
+            }
+        }
+        return val;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
new file mode 100644
index 0000000..8eba468
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MapReduceSplitSettingProcessor implements Processor<MapReduceAnalyzerEntity> {
+
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceSplitSettingProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+
+        if (context.getJobconf().getLong(FileInputFormat.SPLIT_MINSIZE, 0) > 1) {
+            sb.append("Best practice: don't set " + FileInputFormat.SPLIT_MINSIZE);
+            sb.append(", because it may lower data locality, hence maps will run slower.\n");
+            return new Result.ProcessorResult(Result.RuleType.SPLIT, Result.ResultLevel.INFO, sb.toString());
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java
new file mode 100644
index 0000000..00d5cc9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES;
+
+public class MapReduceTaskNumProcessor implements Processor<MapReduceAnalyzerEntity> {
+    private static final String[] SIZE_UNITS = {"B", "K", "M", "G", "T", "P"};
+    private MapReduceJobSuggestionContext context;
+
+    public MapReduceTaskNumProcessor(MapReduceJobSuggestionContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) {
+        StringBuilder sb = new StringBuilder();
+        List<String> optSettings = new ArrayList<>();
+        try {
+            sb.append(analyzeReduceTaskNum(optSettings));
+            sb.append(analyzeMapTaskNum(optSettings));
+
+            if (sb.length() > 0) {
+                return new Result.ProcessorResult(Result.RuleType.TASK_NUMBER, Result.ResultLevel.INFO, sb.toString(), optSettings);
+            }
+        } catch (NullPointerException e) {
+            // When job failed there may not have counters, so just ignore it
+        }
+        return null;
+    }
+
+
+    private String analyzeReduceTaskNum(List<String> optSettings) {
+        StringBuilder sb = new StringBuilder();
+
+        long numReduces = context.getNumReduces();
+        if (numReduces > 0) {
+            long avgReduceTime = context.getAvgReduceTimeInSec();
+            long avgShuffleTime = context.getAvgShuffleTimeInSec();
+            long avgShuffleBytes = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_SHUFFLE_BYTES)
+                    / numReduces;
+            long avgReduceOutput = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.HDFS_BYTES_WRITTEN)
+                    / numReduces;
+            long avgReduceTotalTime = avgShuffleTime + avgReduceTime;
+
+            long suggestReduces = 0;
+            StringBuilder tmpsb = new StringBuilder();
+
+            String avgShuffleDisplaySize = bytesToHumanReadable(avgShuffleBytes);
+            if (avgShuffleBytes < 256 * FileUtils.ONE_MB && avgReduceTotalTime < 300
+                    && avgReduceOutput < 256 * FileUtils.ONE_MB && numReduces > 1) {
+                tmpsb.append("average reduce input bytes is: ").append(avgShuffleDisplaySize).append(", ");
+                suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+            } else if (avgShuffleBytes > 10 * FileUtils.ONE_GB && avgReduceTotalTime > 1800) {
+                tmpsb.append("average reduce input bytes is: ").append(avgShuffleDisplaySize).append(", ");
+                suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+            }
+
+            if (avgReduceTotalTime < 60 && numReduces > 1) {
+                tmpsb.append("average reduce time is only ").append(avgReduceTotalTime).append(" seconds, ");
+                if (suggestReduces == 0) {
+                    suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+                }
+            } else if (avgReduceTotalTime > 3600 && avgReduceTime > 1800) {
+                tmpsb.append("average reduce time is ").append(avgReduceTotalTime).append(" seconds, ");
+                if (suggestReduces == 0) {
+                    suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+                }
+            }
+
+            String avgReduceOutputDisplaySize = bytesToHumanReadable(avgReduceOutput);
+            if (avgReduceOutput < 10 * FileUtils.ONE_MB && avgReduceTime < 300
+                    && avgShuffleBytes < 2 * FileUtils.ONE_GB && numReduces > 1) {
+                tmpsb.append(" average reduce output is only ").append(avgReduceOutputDisplaySize).append(", ");
+                if (suggestReduces == 0) {
+                    suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+                }
+            } else if (avgReduceOutput > 10 * FileUtils.ONE_GB && avgReduceTime > 1800) {
+                tmpsb.append(" average reduce output is ").append(avgReduceOutputDisplaySize).append(", ");
+                if (suggestReduces == 0) {
+                    suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime);
+                }
+            }
+
+            if (suggestReduces > 0) {
+                sb.append("Best practice: ").append(tmpsb.toString()).append("please consider ");
+                if (suggestReduces > numReduces) {
+                    sb.append("increasing the ");
+                } else {
+                    sb.append("decreasing the ");
+                }
+                String setting = String.format("-D%s=%s", NUM_REDUCES, suggestReduces);
+                sb.append("reducer number. You could try ").append(setting).append("\n");
+                optSettings.add(setting);
+            }
+        }
+        return sb.toString();
+    }
+
+    private String analyzeMapTaskNum(List<String> optSettings) {
+        StringBuilder sb = new StringBuilder();
+
+        long numMaps = context.getNumMaps();
+        long avgMapTime = context.getAvgMapTimeInSec();
+        long avgMapInput = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.HDFS_BYTES_READ)
+                / numMaps;
+        String avgMapInputDisplaySize = bytesToHumanReadable(avgMapInput);
+
+        if (avgMapInput < 5 * FileUtils.ONE_MB && avgMapTime < 30 && numMaps > 1) {
+            sb.append("Best practice: average map input bytes only have ").append(avgMapInputDisplaySize);
+            sb.append(". Please reduce the number of mappers by merging input files.\n");
+        } else if (avgMapInput > FileUtils.ONE_GB) {
+            sb.append("Best practice: average map input bytes have ").append(avgMapInputDisplaySize);
+            sb.append(". Please increase the number of mappers by using splittable compression, a container file format or a smaller block size.\n");
+        }
+
+        if (avgMapTime < 10 && numMaps > 1) {
+            sb.append("Best practice: average map time only have ").append(avgMapTime);
+            sb.append(" seconds. Please reduce the number of mappers by merging input files or by using a larger block size.\n");
+        } else if (avgMapTime > 600 && avgMapInput < FileUtils.ONE_GB) {
+            sb.append("Best practice: average map time is ").append(avgMapInput);
+            sb.append(" seconds. Please increase the number of mappers by using splittable compression, a container file format or a smaller block size.\n");
+        }
+
+        return sb.toString();
+    }
+
+    private long getReduceNum(long avgInputBytes, long avgOutputBytes, long avgTime) {
+        long newReduceNum = 1;
+        long tmpReduceNum;
+
+        long numReduces = context.getNumReduces();
+        tmpReduceNum = avgInputBytes * numReduces / (3 * FileUtils.ONE_GB);
+        if (tmpReduceNum > newReduceNum) {
+            newReduceNum = tmpReduceNum;
+        }
+
+        tmpReduceNum = avgOutputBytes * numReduces / (2 * FileUtils.ONE_GB);
+        if (tmpReduceNum > newReduceNum) {
+            newReduceNum = tmpReduceNum;
+        }
+
+        tmpReduceNum = avgTime * numReduces / (10 * 60);
+        if (tmpReduceNum > newReduceNum) {
+            newReduceNum = tmpReduceNum;
+        }
+
+        return newReduceNum;
+    }
+
+
+    private static String bytesToHumanReadable(long bytes) {
+        double val = bytes;
+        int idx = 0;
+        while (val >= 1024) {
+            val /= 1024.0;
+            idx += 1;
+        }
+        StringBuilder sb = new StringBuilder();
+        sb.append((int)Math.floor(val));
+        sb.append(SIZE_UNITS[idx]);
+        int tmp = (int)(1000 * val) % 1000;
+        if (idx >= 1 && tmp > 0) {
+            sb.append(tmp);
+            sb.append(SIZE_UNITS[idx - 1]);
+        }
+        return sb.toString();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
index 6109704..0d7d2d7 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
@@ -18,23 +18,61 @@
 package org.apache.eagle.jpm.analyzer.publisher;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
 
 public class EagleStorePublisher implements Publisher, Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(EagleStorePublisher.class);
 
     private Config config;
+    private IEagleServiceClient client;
+    private AlertDeduplicator alertDeduplicator;
 
     public EagleStorePublisher(Config config) {
         this.config = config;
+        this.alertDeduplicator = new SimpleDeduplicator();
     }
 
     @Override
     public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+        if (result.getAlertMessages().size() == 0) {
+            return;
+        }
+
+        LOG.info("EagleStorePublisher gets job {}", analyzerJobEntity.getJobDefId());
+        if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
+            LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
+            return;
+        }
+
+        try {
+            this.client = new EagleServiceClientImpl(config);
+            for (Map.Entry<String, List<TaggedLogAPIEntity>> entry : result.getAlertEntities().entrySet()) {
+                client.create(entry.getValue());
+                LOG.info("successfully persist {} entities for evaluator {}", entry.getValue().size(), entry.getKey());
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        } finally {
+            if (client != null) {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    LOG.error(e.getMessage(), e);
+                }
+            }
+        }
 
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
index 4e49094..842e0ac 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
@@ -18,12 +18,10 @@
 package org.apache.eagle.jpm.analyzer.publisher;
 
 import com.typesafe.config.Config;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.app.service.ApplicationEmailService;
 import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.common.mail.AlertEmailConstants;
 import org.apache.eagle.common.mail.AlertEmailContext;
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
 import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
 import org.apache.eagle.jpm.analyzer.util.Constants;
@@ -31,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.net.URLEncoder;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -69,19 +66,15 @@ public class EmailPublisher implements Publisher, Serializable {
         basic.put("end", analyzerJobEntity.getEndTime() == 0
                 ? "0"
                 : DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getEndTime()));
-        basic.put("progress", analyzerJobEntity.getProgress() + "%");
+        double progress = analyzerJobEntity.getCurrentState().equalsIgnoreCase(org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString()) ? analyzerJobEntity.getProgress() : 100;
+        basic.put("progress", progress + "%");
         basic.put("detail", getJobLink(analyzerJobEntity));
 
-
-        Map<String, Map<String, String>> extend = new HashMap<>();
-        Map<String, List<Pair<Result.ResultLevel, String>>> alertMessages = result.getAlertMessages();
-        for (String evaluator : alertMessages.keySet()) {
-            List<Pair<Result.ResultLevel, String>> messages = alertMessages.get(evaluator);
-            extend.put(evaluator, new HashMap<>());
-            for (Pair<Result.ResultLevel, String> message : messages) {
+        Map<String, List<Result.ProcessorResult>> extend = result.getAlertMessages();
+        for (String evaluator : extend.keySet()) {
+            for (Result.ProcessorResult message : extend.get(evaluator)) {
                 LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]",
-                        analyzerJobEntity.getJobDefId(), message.getRight(), message.getLeft(), evaluator);
-                extend.get(evaluator).put(message.getRight(), message.getLeft().toString());
+                        analyzerJobEntity.getJobDefId(), message.getMessage(), message.getResultLevel(), evaluator);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
index 2f42bf9..1f42ef9 100644
--- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
@@ -17,7 +17,7 @@
 
 package org.apache.eagle.jpm.analyzer.publisher;
 
-import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
 
 public interface Publisher {
     void publish(AnalyzerEntity analyzerJobEntity, Result result);