You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/01/12 02:23:29 UTC

[1/2] eagle git commit: [EAGLE-797] add job performance analysis

Repository: eagle
Updated Branches:
  refs/heads/master 2adbbf59f -> b4695801f


http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
new file mode 100644
index 0000000..39cec68
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
@@ -0,0 +1,131 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+<head>
+    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
+    <meta name="viewport" content="width=device-width"/>
+    <style type="text/css">
+        img {
+            max-width: 100%;
+        }
+
+        body {
+            -webkit-font-smoothing: antialiased;
+            -webkit-text-size-adjust: none;
+            width: 100% !important;
+            height: 100%;
+            line-height: 1.6em;
+        }
+
+        body {
+            background-color: #f6f6f6;
+        }
+
+        @media only screen and (max-width: 640px) {
+            body {
+                padding: 0 !important;
+            }
+
+            h1 {
+                font-weight: 800 !important;
+                margin: 20px 0 5px !important;
+            }
+
+            h2 {
+                font-weight: 800 !important;
+                margin: 20px 0 5px !important;
+            }
+
+            h3 {
+                font-weight: 800 !important;
+                margin: 20px 0 5px !important;
+            }
+
+            h4 {
+                font-weight: 800 !important;
+                margin: 20px 0 5px !important;
+            }
+
+            h1 {
+                font-size: 22px !important;
+            }
+
+            h2 {
+                font-size: 18px !important;
+            }
+
+            h3 {
+                font-size: 14px !important;
+            }
+
+            .container {
+                padding: 0 !important;
+                width: 100% !important;
+            }
+
+            .content {
+                padding: 0 !important;
+            }
+
+            .content-wrap {
+                padding: 10px !important;
+            }
+
+            .invoice {
+                width: 100% !important;
+            }
+        }
+    </style>
+</head>
+<body>
+    #set ( $elem = $alertList[0] )
+
+<p><b>Basic Information: </b></p>
+
+<ul>
+    <li>Site: ${elem["basic"].get("site")}</li>
+    <li>Job Name: ${elem["basic"].get("name")}</li>
+    <li>User: ${elem["basic"].get("user")}</li>
+    <li>Job Status: ${elem["basic"].get("status")}</li>
+    <li>Start Time: ${elem["basic"].get("start")}</li>
+    <li>End Time: ${elem["basic"].get("end")}</li>
+    <li>Duration Time: ${elem["basic"].get("duration")}</li>
+    <li>Progress: ${elem["basic"].get("progress")}</li>
+    <li>Job Detail: <a href=${elem["basic"].get("detail")}>${elem["basic"].get("detail")}</a></li>
+</ul>
+
+<p><b>Analyzer Results: </b></p>
+
+#foreach($evaluator in ${elem["extend"].keySet()})
+<table class="body-wrap" style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; width: 100%; background-color: #f6f6f6; margin: 0;" bgcolor="#f6f6f6" border="1">
+    <caption><b>Analysis By $evaluator</b></caption>
+    <tr>
+        <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></td>
+        <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="250"><b>message</b></th>
+    </tr>
+    #foreach($message in ${elem["extend"].get($evaluator).keySet()})
+        <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;"><b>${elem["extend"].get($evaluator).get($message)}</b></td>
+            <th style="...">$message</th>
+        </tr>
+    #end
+</table>
+#end
+
+</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql
new file mode 100644
index 0000000..3d790d0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql
@@ -0,0 +1,23 @@
+CREATE TABLE IF NOT EXISTS jobs (
+  jobDefId VARCHAR(50) NOT NULL,
+  configuration MEDIUMTEXT NOT NULL,
+  createdtime bigint(20) DEFAULT NULL,
+  modifiedtime  bigint(20) DEFAULT NULL,
+  PRIMARY KEY (jobDefId)
+);
+
+CREATE TABLE IF NOT EXISTS job_evaluators (
+  jobDefId VARCHAR(50) NOT NULL,
+  evaluator VARCHAR(100) NOT NULL,
+  createdtime bigint(20) DEFAULT NULL,
+  modifiedtime  bigint(20) DEFAULT NULL,
+  PRIMARY KEY (jobDefId, evaluator)
+);
+
+CREATE TABLE IF NOT EXISTS job_publishments (
+  userId VARCHAR(100) PRIMARY KEY,
+  mailAddress mediumtext NOT NULL,
+  createdtime bigint(20) DEFAULT NULL,
+  modifiedtime  bigint(20) DEFAULT NULL,
+  PRIMARY KEY (userId)
+);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
index 6f337e7..c3d08d4 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
@@ -89,7 +89,6 @@ public class MRHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBa
         } catch (Exception e) {
             return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
         } finally {
-            client.getJerseyClient().destroy();
             try {
                 client.close();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 6dc5791..01f98c0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -84,7 +84,6 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
             } finally {
                 list.clear();
                 jobConfigurationEntity = null;
-                client.getJerseyClient().destroy();
                 client.close();
             }
             tried++;

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 80b4049..2f77456 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -97,7 +97,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
             eagleServiceConfig.username,
             eagleServiceConfig.password);
 
-        client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
         logger.info("start flushing entities of total number " + list.size());
         List<GenericMetricEntity> metricEntities = new ArrayList<>();
         for (int i = 0; i < list.size(); i++) {
@@ -167,7 +167,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
 
         logger.info("finish flushing entities of total number " + list.size());
         list.clear();
-        client.getJerseyClient().destroy();
         client.close();
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index 666b3db..856f051 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -120,7 +120,7 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
             eagleServiceConfig.username,
             eagleServiceConfig.password);
 
-        client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
         List<TaskAttemptCounterAPIEntity> list = new ArrayList<>();
         logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size());
         // create entity
@@ -149,7 +149,6 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
         logger.info("end  flushing TaskAttemptCounter entities of total number " + counters.size());
         counters.clear();
         list.clear();
-        client.getJerseyClient().destroy();
         client.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 40e6432..794f992 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -139,7 +139,6 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
             }
             tried++;
         }
-        client.getJerseyClient().destroy();
         client.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml
index f6f1be5..02f2bc4 100644
--- a/eagle-jpm/eagle-jpm-mr-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -79,6 +79,16 @@
             <version>${powermock.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-analyzer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-metadata-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <resources>

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index de0d846..309146e 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.running;
 
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
 import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
 import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
 import org.apache.eagle.jpm.util.Constants;
@@ -67,7 +68,8 @@ public class MRRunningJobApplication extends StormApplication {
                 mrRunningJobConfig.getEndpointConfig(),
                 mrRunningJobConfig.getZkStateConfig(),
                 confKeyKeys,
-                config),
+                config,
+                new MRJobPerformanceAnalyzer(config)),
                 tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
         return topologyBuilder.createTopology();
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
index 5a57aca..6670282 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
@@ -18,6 +18,11 @@ package org.apache.eagle.jpm.mr.running;
 
 import org.apache.eagle.app.service.ApplicationListener;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceJDBCImpl;
+import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceMemoryImpl;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
+import org.apache.eagle.metadata.store.jdbc.JDBCMetadataStore;
 
 import java.util.Optional;
 
@@ -31,4 +36,10 @@ public class MRRunningJobApplicationProvider extends AbstractApplicationProvider
     public Optional<ApplicationListener> getApplicationListener() {
         return Optional.of(new MRRunningJobApplicationListener());
     }
+
+    @Override
+    protected void onRegister() {
+        bind(MemoryMetadataStore.class, MetaManagementService.class, MetaManagementServiceMemoryImpl.class);
+        bind(JDBCMetadataStore.class, MetaManagementService.class, MetaManagementServiceJDBCImpl.class);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index c2cbbe5..2bc1648 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -85,7 +85,6 @@ public class MRJobEntityCreationHandler {
             LOG.warn("exception found when flush entities, {}", e);
             return false;
         } finally {
-            client.getJerseyClient().destroy();
             try {
                 client.close();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 52c1866..c21eaf1 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -19,6 +19,8 @@
 package org.apache.eagle.jpm.mr.running.parser;
 
 import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
 import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
 import org.apache.eagle.jpm.mr.runningentity.JobConfig;
@@ -82,6 +84,7 @@ public class MRJobParser implements Runnable {
     private static final int FLUSH_TASKS_EVERY_TIME = 5;
     private static final int MAX_TASKS_PERMIT = 5000;
     private Config config;
+    private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer;
 
     static {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
@@ -92,7 +95,8 @@ public class MRJobParser implements Runnable {
                        AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
                        MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher,
                        List<String> configKeys,
-                       Config config) {
+                       Config config,
+                       MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) {
         this.app = app;
         if (mrJobMap == null) {
             this.mrJobEntityMap = new HashMap<>();
@@ -112,6 +116,7 @@ public class MRJobParser implements Runnable {
         this.finishedTaskIds = new HashSet<>();
         this.configKeys = configKeys;
         this.config = config;
+        this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer;
     }
 
     public void setAppInfo(AppInfo app) {
@@ -168,6 +173,7 @@ public class MRJobParser implements Runnable {
                     break;
                 }
             }
+            mrJobPerformanceAnalyzer.analysis(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
         }
     }
 
@@ -586,4 +592,21 @@ public class MRJobParser implements Runnable {
             }
         }
     }
+
+    private AnalyzerEntity convertToAnalysisEntity(JobExecutionAPIEntity jobExecutionAPIEntity) {
+        AnalyzerEntity mrJobAnalysisEntity = new AnalyzerEntity();
+        Map<String, String> tags = jobExecutionAPIEntity.getTags();
+        mrJobAnalysisEntity.setJobDefId(tags.get(MRJobTagName.JOD_DEF_ID.toString()));
+        mrJobAnalysisEntity.setJobId(tags.get(MRJobTagName.JOB_ID.toString()));
+        mrJobAnalysisEntity.setSiteId(tags.get(MRJobTagName.SITE.toString()));
+        mrJobAnalysisEntity.setUserId(tags.get(MRJobTagName.USER.toString()));
+
+        mrJobAnalysisEntity.setStartTime(jobExecutionAPIEntity.getStartTime());
+        mrJobAnalysisEntity.setEndTime(jobExecutionAPIEntity.getEndTime());
+        mrJobAnalysisEntity.setDurationTime(jobExecutionAPIEntity.getDurationTime());
+        mrJobAnalysisEntity.setCurrentState(jobExecutionAPIEntity.getInternalState());
+        mrJobAnalysisEntity.setJobConfig(new HashMap<>(jobExecutionAPIEntity.getJobConfig()));
+        mrJobAnalysisEntity.setProgress(this.app.getProgress());
+        return mrJobAnalysisEntity;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 8ec2dec..915df8a 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -19,6 +19,7 @@
 package org.apache.eagle.jpm.mr.running.storm;
 
 import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
 import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
@@ -51,18 +52,21 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
     private ResourceFetcher resourceFetcher;
     private List<String> configKeys;
     private Config config;
+    private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer;
 
     public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
                                  MRRunningJobConfig.EndpointConfig endpointConfig,
                                  MRRunningJobConfig.ZKStateConfig zkStateConfig,
                                  List<String> configKeys,
-                                 Config config) {
+                                 Config config,
+                                 MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) {
         this.eagleServiceConfig = eagleServiceConfig;
         this.endpointConfig = endpointConfig;
         this.runningMRParsers = new HashMap<>();
         this.zkStateConfig = zkStateConfig;
         this.configKeys = configKeys;
         this.config = config;
+        this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer;
     }
 
     @Override
@@ -83,7 +87,8 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
         MRJobParser applicationParser;
         if (!runningMRParsers.containsKey(appInfo.getId())) {
             applicationParser = new MRJobParser(endpointConfig, eagleServiceConfig,
-                    appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config);
+                    appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config,
+                    mrJobPerformanceAnalyzer);
             runningMRParsers.put(appInfo.getId(), applicationParser);
             LOG.info("create application parser for {}", appInfo.getId());
         } else {

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
index 787c9ac..fc674d6 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -23,6 +23,7 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
 import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
 import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
@@ -88,7 +89,8 @@ public class MRRunningJobApplicationTest {
                 mrRunningJobConfig.getEndpointConfig(),
                 mrRunningJobConfig.getZkStateConfig(),
                 confKeyKeys,
-                config);
+                config,
+                new MRJobPerformanceAnalyzer(config));
         MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class);
         PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager);
         mrRunningJobParseBolt.prepare(null, null, null);

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index e0b5533..7046f8b 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.TestingServer;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
 import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
 import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener;
 import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
@@ -131,7 +132,7 @@ public class MRJobParserTest {
         MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
         RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -186,7 +187,7 @@ public class MRJobParserTest {
         MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
         RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -228,7 +229,7 @@ public class MRJobParserTest {
         MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
         RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -272,7 +273,7 @@ public class MRJobParserTest {
         MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
         RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -318,7 +319,7 @@ public class MRJobParserTest {
         RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
         when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -357,7 +358,7 @@ public class MRJobParserTest {
             eagleServiceConfig.username,
             eagleServiceConfig.password).thenReturn(client);
         when(client.create(any())).thenThrow(Exception.class).thenReturn(null);
-        when(client.getJerseyClient()).thenReturn(new Client());
+        //when(client.getJerseyClient()).thenReturn(new Client());
         mockInputJobSteam("/mrjob_30784.json", JOB_URL);
         mockInputJobSteamWithException(JOB_COUNT_URL);
         mockGetConnection("/mrconf_30784.xml");
@@ -377,7 +378,7 @@ public class MRJobParserTest {
         RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
         when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
         MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
-                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+                app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
 
 
         Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -401,7 +402,7 @@ public class MRJobParserTest {
         Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null);
         Assert.assertTrue(entities.isEmpty());
         verify(client, times(2)).create(any());
-        verify(client, times(1)).getJerseyClient();
+        //verify(client, times(1)).getJerseyClient();
         verify(client, times(1)).close();
 
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
index fdfcaad..8127aa2 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
@@ -50,7 +50,7 @@ public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthChec
                 eagleServiceConfig.username,
                 eagleServiceConfig.password);
 
-        client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000);
+        client.setReadTimeout(eagleServiceConfig.timeout * 1000);
 
         String message = "";
         try {
@@ -88,7 +88,6 @@ public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthChec
         } catch (Exception e) {
             return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
         } finally {
-            client.getJerseyClient().destroy();
             try {
                 client.close();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
index 05e35e4..2ef1bd9 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -708,7 +708,7 @@ public class JHFSparkEventReader {
             config.eagleInfo.username,
             config.eagleInfo.password);
         int timeout = config.eagleInfo.timeout;
-        client.getJerseyClient().setReadTimeout(timeout * 1000);
+        client.setReadTimeout(timeout * 1000);
 
         return client;
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
index 92adfa8..adef27f 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
@@ -57,7 +57,7 @@ public class SparkAppEntityCreationHandler {
                 eagleServiceConfig.eagleServicePort,
                 eagleServiceConfig.username,
                 eagleServiceConfig.password)) {
-            client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+            client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
             LOG.info("start to flush spark app entities, size {}", entities.size());
             client.create(entities);
             LOG.info("finish flushing spark app entities, size {}", entities.size());

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 9025d36..a065373 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -39,7 +39,7 @@ public class Utils {
             try {
                 is.close();
             } catch (Exception e) {
-                e.printStackTrace();
+                LOG.warn("{}", e);
             }
         }
     }
@@ -48,7 +48,7 @@ public class Utils {
         try {
             Thread.sleep(seconds * 1000);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOG.warn("{}", e);
         }
     }
 
@@ -60,7 +60,7 @@ public class Utils {
             Date parsedDate = dateFormat.parse(date);
             timestamp = parsedDate.getTime();
         } catch (ParseException e) {
-            e.printStackTrace();
+            LOG.warn("{}", e);
         }
 
         if (timestamp == 0L) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/pom.xml b/eagle-jpm/pom.xml
index 1ffdb03..e7ae3c3 100644
--- a/eagle-jpm/pom.xml
+++ b/eagle-jpm/pom.xml
@@ -32,6 +32,7 @@
     <packaging>pom</packaging>
 
     <modules>
+        <module>eagle-jpm-analyzer</module>
         <module>eagle-jpm-spark-running</module>
         <module>eagle-jpm-spark-history</module>
         <module>eagle-jpm-mr-history</module>

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-server-assembly/src/main/conf/eagle.conf
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf
index af7e14a..a889914 100644
--- a/eagle-server-assembly/src/main/conf/eagle.conf
+++ b/eagle-server-assembly/src/main/conf/eagle.conf
@@ -108,6 +108,12 @@ application {
     recipients: "nobody@abc.com"
     template: "JobReportTemplate.vm"
   }
+  analyzerReport {
+    sender: "nobody@abc.com"
+    recipients: "nobody@abc.com"
+    template: "AnalyzerReportTemplate.vm"
+    cc: "nobody@abc.com"
+  }
 }
 
 # ---------------------------------------------

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
index bf5e695..950bb04 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
@@ -50,7 +50,7 @@ public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckB
                 topologyCheckAppConfig.getConfig().getString("service.username"),
                 topologyCheckAppConfig.getConfig().getString("service.password"));
 
-        client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
+        client.setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
 
         String message = "";
         try {
@@ -80,7 +80,6 @@ public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckB
         } catch (Exception e) {
             return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
         } finally {
-            client.getJerseyClient().destroy();
             try {
                 client.close();
             } catch (Exception e) {


[2/2] eagle git commit: [EAGLE-797] add job performance analysis

Posted by qi...@apache.org.
[EAGLE-797] add job performance analysis

for continuous development

Author: wujinhu <wu...@126.com>

Closes #762 from wujinhu/EAGLE-797.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/b4695801
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/b4695801
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/b4695801

Branch: refs/heads/master
Commit: b4695801f6c93ca0991fd9758c470b784aa8c781
Parents: 2adbbf5
Author: wujinhu <wu...@126.com>
Authored: Thu Jan 12 10:23:02 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Jan 12 10:23:02 2017 +0800

----------------------------------------------------------------------
 .../app/spi/AbstractApplicationProvider.java    |   3 +-
 ...adoopQueueRunningApplicationHealthCheck.java |   3 +-
 .../jpm/aggregation/mr/MRMetricAggregator.java  |   3 +
 .../mr/MRMetricsAggregateContainer.java         |   1 +
 eagle-jpm/eagle-jpm-analyzer/pom.xml            |  38 ++++++
 .../eagle/jpm/analyzer/AnalyzerEntity.java      | 130 ++++++++++++++++++
 .../apache/eagle/jpm/analyzer/Evaluator.java    |  24 ++++
 .../apache/eagle/jpm/analyzer/JobAnalyzer.java  |  28 ++++
 .../apache/eagle/jpm/analyzer/Processor.java    |  24 ++++
 .../analyzer/meta/MetaManagementService.java    |  39 ++++++
 .../impl/MetaManagementServiceJDBCImpl.java     |  77 +++++++++++
 .../impl/MetaManagementServiceMemoryImpl.java   | 127 ++++++++++++++++++
 .../jpm/analyzer/meta/model/JobMetaEntity.java  |  85 ++++++++++++
 .../analyzer/meta/model/PublisherEntity.java    |  77 +++++++++++
 .../analyzer/mr/MRJobPerformanceAnalyzer.java   |  65 +++++++++
 .../jpm/analyzer/mr/sla/SLAJobEvaluator.java    |  67 ++++++++++
 .../sla/processors/LongStuckJobProcessor.java   |  43 ++++++
 .../UnExpectedLongDurationJobProcessor.java     | 120 +++++++++++++++++
 .../mr/suggestion/JobSuggestionEvaluator.java   |  44 +++++++
 .../analyzer/publisher/EagleStorePublisher.java |  40 ++++++
 .../jpm/analyzer/publisher/EmailPublisher.java  | 109 +++++++++++++++
 .../eagle/jpm/analyzer/publisher/Publisher.java |  24 ++++
 .../eagle/jpm/analyzer/publisher/Result.java    | 109 +++++++++++++++
 .../publisher/dedup/AlertDeduplicator.java      |  25 ++++
 .../dedup/impl/SimpleDeduplicator.java          |  59 +++++++++
 .../jpm/analyzer/resource/AnalyzerResource.java | 131 +++++++++++++++++++
 .../eagle/jpm/analyzer/util/Constants.java      |  65 +++++++++
 .../apache/eagle/jpm/analyzer/util/Utils.java   |  74 +++++++++++
 .../main/resources/AnalyzerReportTemplate.vm    | 131 +++++++++++++++++++
 .../src/main/resources/createTable.sql          |  23 ++++
 .../MRHistoryJobApplicationHealthCheck.java     |   1 -
 ...JobConfigurationCreationServiceListener.java |   1 -
 .../JobEntityCreationEagleServiceListener.java  |   3 +-
 .../parser/TaskAttemptCounterListener.java      |   3 +-
 .../mr/history/parser/TaskFailureListener.java  |   1 -
 eagle-jpm/eagle-jpm-mr-running/pom.xml          |  10 ++
 .../jpm/mr/running/MRRunningJobApplication.java |   4 +-
 .../MRRunningJobApplicationProvider.java        |  11 ++
 .../parser/MRJobEntityCreationHandler.java      |   1 -
 .../jpm/mr/running/parser/MRJobParser.java      |  25 +++-
 .../mr/running/storm/MRRunningJobParseBolt.java |   9 +-
 .../mr/running/MRRunningJobApplicationTest.java |   4 +-
 .../jpm/mr/running/parser/MRJobParserTest.java  |  17 +--
 .../SparkHistoryJobApplicationHealthCheck.java  |   3 +-
 .../history/crawl/JHFSparkEventReader.java      |   2 +-
 .../parser/SparkAppEntityCreationHandler.java   |   2 +-
 .../java/org/apache/eagle/jpm/util/Utils.java   |   6 +-
 eagle-jpm/pom.xml                               |   1 +
 eagle-server-assembly/src/main/conf/eagle.conf  |   6 +
 .../TopologyCheckApplicationHealthCheck.java    |   3 +-
 50 files changed, 1868 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 2a8d7c0..e537643 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -19,6 +19,7 @@ package org.apache.eagle.app.spi;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.app.Application;
 import org.apache.eagle.app.service.ApplicationListener;
@@ -130,7 +131,7 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
         currentRegistry.register(scope, new AbstractModule() {
             @Override
             protected void configure() {
-                bind(type).to(impl);
+                bind(type).to(impl).in(Singleton.class);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
index 6471dfc..5a5d0ee 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
@@ -52,7 +52,7 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC
                 eagleServiceConfig.eagleService.username,
                 eagleServiceConfig.eagleService.password);
 
-        client.getJerseyClient().setReadTimeout(60000);
+        client.setReadTimeout(60000);
 
         String message = "";
         try {
@@ -91,7 +91,6 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC
         } catch (Exception e) {
             return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
         } finally {
-            client.getJerseyClient().destroy();
             try {
                 client.close();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
index f8840b2..54bd29b 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
@@ -74,6 +74,8 @@ public class MRMetricAggregator implements MetricAggregator, Serializable {
                 .endTime(endTime)
                 .pageSize(Integer.MAX_VALUE)
                 .send();
+
+            client.close();
         } catch (Exception e) {
             LOG.warn("{}", e);
             return false;
@@ -151,6 +153,7 @@ public class MRMetricAggregator implements MetricAggregator, Serializable {
             client.create(entities);
             LOG.info("finish flushing entities of total number " + entities.size());
             entities.clear();
+            client.close();
         } catch (Exception e) {
             LOG.warn("{}", e);
             return false;

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
index 45bbcef..dd1980b 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
@@ -74,6 +74,7 @@ public class MRMetricsAggregateContainer implements MetricsAggregateContainer, S
 
             List<Map<List<String>, List<Double>>> results = response.getObj();
             long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+            client.close();
             return currentProcessTimeStamp;
         } catch (Exception e) {
             LOG.warn("{}", e);

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml
new file mode 100644
index 0000000..d6383b6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-jpm-parent</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-jpm-analyzer</artifactId>
+    <name>Eagle::App::JPM::Analyzer</name>
+    <url>http://maven.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-metadata-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-jpm-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
new file mode 100644
index 0000000..f9b7af0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * will refactor later if other types of job needs this.
+ * AnalyzerEntity for each job needed to be analysised
+ */
+public class AnalyzerEntity {
+    private String jobDefId;
+    private String jobId;
+    private String siteId;
+    private String userId;
+
+    private long startTime;
+    private long endTime;
+    private long durationTime;
+    private String currentState;
+    private double progress;
+
+    private Map<String, Object> jobConfig = new HashMap<>();
+
+    private Map<String, Object> jobMeta = new HashMap<>();
+
+    public String getJobDefId() {
+        return jobDefId;
+    }
+
+    public void setJobDefId(String jobDefId) {
+        this.jobDefId = jobDefId;
+    }
+
+    public String getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(String jobId) {
+        this.jobId = jobId;
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
+    public String getUserId() {
+        return userId;
+    }
+
+    public void setUserId(String userId) {
+        this.userId = userId;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(long endTime) {
+        this.endTime = endTime;
+    }
+
+    public long getDurationTime() {
+        return durationTime;
+    }
+
+    public void setDurationTime(long durationTime) {
+        this.durationTime = durationTime;
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+    }
+
+    public Map<String, Object> getJobConfig() {
+        return jobConfig;
+    }
+
+    public void setJobConfig(Map<String, Object> jobConfig) {
+        this.jobConfig = jobConfig;
+    }
+
+    public Map<String, Object> getJobMeta() {
+        return jobMeta;
+    }
+
+    public void setJobMeta(Map<String, Object> jobMeta) {
+        this.jobMeta = jobMeta;
+    }
+
+    public double getProgress() {
+        return progress;
+    }
+
+    public void setProgress(double progress) {
+        this.progress = progress;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
new file mode 100644
index 0000000..6617916
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface Evaluator {
+    Result.EvaluatorResult evaluate(AnalyzerEntity analyzerEntity);
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
new file mode 100644
index 0000000..6cda1cd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+/**
+ * Each JobAnalyzer contains one or more Evaluators to analysis each job.
+ * Each Evaluator is a group of Processors
+ * Each Processor implements an algorithm or a model to analysis one dimension of a job
+ *
+ */
+public interface JobAnalyzer {
+    void analysis(AnalyzerEntity analyzerEntity) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
new file mode 100644
index 0000000..d5a8a74
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface Processor {
+    Result.ProcessorResult process(AnalyzerEntity jobAnalysisEntity);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
new file mode 100644
index 0000000..0935266
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta;
+
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+
+import java.util.List;
+
+public interface MetaManagementService {
+    boolean addJobMeta(JobMetaEntity jobMetaEntity);
+
+    boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity);
+
+    List<JobMetaEntity> getJobMeta(String jobDefId);
+
+    boolean deleteJobMeta(String jobDefId);
+
+    boolean addPublisherMeta(PublisherEntity publisherEntity);
+
+    boolean deletePublisherMeta(String userId);
+
+    List<PublisherEntity> getPublisherMeta(String userId);
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
new file mode 100644
index 0000000..cfb5029
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class MetaManagementServiceJDBCImpl implements MetaManagementService, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceJDBCImpl.class);
+
+    @Inject
+    Config config;
+
+    @Override
+    public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
+
+        return true;
+    }
+
+    @Override
+    public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
+
+        return true;
+    }
+
+    @Override
+    public List<JobMetaEntity> getJobMeta(String jobDefId) {
+
+        return null;
+    }
+
+    @Override
+    public boolean deleteJobMeta(String jobDefId) {
+
+        return true;
+    }
+
+    @Override
+    public boolean addPublisherMeta(PublisherEntity publisherEntity) {
+
+        return true;
+    }
+
+    @Override
+    public boolean deletePublisherMeta(String userId) {
+
+        return true;
+    }
+
+    @Override
+    public List<PublisherEntity> getPublisherMeta(String userId) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
new file mode 100644
index 0000000..85e8358
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class MetaManagementServiceMemoryImpl implements MetaManagementService, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceMemoryImpl.class);
+
+    private final Map<String, JobMetaEntity> jobMetaEntities = new HashMap<>();
+    private final Map<String, List<PublisherEntity>> publisherEntities = new HashMap<>();
+
+    @Inject
+    Config config;
+
+    @Override
+    public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
+        if (jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
+            LOG.warn("contains job {} already, add job meta failed", jobMetaEntity.getJobDefId());
+            return false;
+        }
+
+        jobMetaEntities.put(jobMetaEntity.getJobDefId(), jobMetaEntity);
+        LOG.info("Successfully add job {} meta", jobMetaEntity.getJobDefId());
+        return true;
+    }
+
+    @Override
+    public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
+        if (!jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
+            LOG.warn("does not contain job {}, update job meta failed", jobDefId);
+            return false;
+        }
+
+        jobMetaEntities.put(jobDefId, jobMetaEntity);
+        LOG.info("Successfully update job {} meta", jobDefId);
+        return true;
+    }
+
+    @Override
+    public List<JobMetaEntity> getJobMeta(String jobDefId) {
+        if (!jobMetaEntities.containsKey(jobDefId)) {
+            LOG.warn("does not contain job {}, get job meta failed", jobDefId);
+            return new ArrayList<>();
+        }
+
+        return Arrays.asList(jobMetaEntities.get(jobDefId));
+    }
+
+    @Override
+    public boolean deleteJobMeta(String jobDefId) {
+        if (!jobMetaEntities.containsKey(jobDefId)) {
+            LOG.warn("does not contain job {}, delete job meta failed", jobDefId);
+            return false;
+        }
+
+        jobMetaEntities.remove(jobDefId);
+        LOG.info("Successfully delete job {} meta", jobDefId);
+        return true;
+    }
+
+    @Override
+    public boolean addPublisherMeta(PublisherEntity publisherEntity) {
+        if (publisherEntities.containsKey(publisherEntity.getUserId())) {
+            for (PublisherEntity entity : publisherEntities.get(publisherEntity.getUserId())) {
+                if (entity.equals(publisherEntity)) {
+                    LOG.warn("contains user {}, mailAddress {} already, add publisher failed", entity.getUserId(), entity.getMailAddress());
+                    return false;
+                }
+            }
+        }
+
+        if (!publisherEntities.containsKey(publisherEntity.getUserId())) {
+            publisherEntities.put(publisherEntity.getUserId(), new ArrayList<>());
+        }
+
+        publisherEntities.get(publisherEntity.getUserId()).add(publisherEntity);
+        LOG.info("Successfully add publisher user {}, mailAddress {}", publisherEntity.getUserId(), publisherEntity.getMailAddress());
+        return true;
+    }
+
+    @Override
+    public boolean deletePublisherMeta(String userId) {
+        if (!publisherEntities.containsKey(userId)) {
+            LOG.warn("does not contain user {}, failed to delete publisher", userId);
+            return false;
+        }
+
+        publisherEntities.remove(userId);
+        LOG.info("Successfully delete publisher user " + userId);
+        return true;
+    }
+
+    @Override
+    public List<PublisherEntity> getPublisherMeta(String userId) {
+        if (!publisherEntities.containsKey(userId)) {
+            LOG.warn("does not contain user {}, failed to get publisher", userId);
+            return new ArrayList<>();
+        }
+
+        return publisherEntities.get(userId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
new file mode 100644
index 0000000..2e15c17
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobMetaEntity extends PersistenceEntity {
+    private String jobDefId;
+    private String siteId;
+    private Map<String, Object> configuration = new HashMap<>();
+    private Set<String> evaluators = new HashSet<>();
+
+    public JobMetaEntity() {
+
+    }
+
+    public JobMetaEntity(String jobDefId,
+                         String siteId,
+                         Map<String, Object> configuration,
+                         Set<String> evaluators) {
+        this.jobDefId = jobDefId;
+        this.siteId = siteId;
+        this.configuration = configuration;
+        this.evaluators = evaluators;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("JobMetaEntity[jobDefId=%s, siteId=%s]", jobDefId, siteId);
+    }
+
+    public String getJobDefId() {
+        return jobDefId;
+    }
+
+    public void setJobDefId(String jobDefId) {
+        this.jobDefId = jobDefId;
+    }
+
+    public String getSiteId() {
+        return siteId;
+    }
+
+    public void setSiteId(String siteId) {
+        this.siteId = siteId;
+    }
+
+    public Map<String, Object> getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(Map<String, Object> configuration) {
+        this.configuration = configuration;
+    }
+
+    public Set<String> getEvaluators() {
+        return evaluators;
+    }
+
+    public void setEvaluators(Set<String> evaluators) {
+        this.evaluators = evaluators;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
new file mode 100644
index 0000000..bca7ab1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PublisherEntity extends PersistenceEntity {
+    private String userId;
+    private String mailAddress;
+
+    public PublisherEntity(String userId, String mailAddress) {
+        this.userId = userId;
+        this.mailAddress = mailAddress;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("PublisherEntity[userId=%s, mailAddress=%s]", userId, mailAddress);
+    }
+
+    public String getUserId() {
+        return userId;
+    }
+
+    public void setUserId(String userId) {
+        this.userId = userId;
+    }
+
+    public String getMailAddress() {
+        return mailAddress;
+    }
+
+    public void setMailAddress(String mailAddress) {
+        this.mailAddress = mailAddress;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(userId)
+                .append(mailAddress)
+                .build();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+        if (that == this) {
+            return true;
+        }
+
+        if (!(that instanceof PublisherEntity)) {
+            return false;
+        }
+
+        PublisherEntity another = (PublisherEntity)that;
+
+        return another.userId.equals(this.userId) && another.mailAddress.equals(this.mailAddress);
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
new file mode 100644
index 0000000..e0e579a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.*;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
+import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
+import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher;
+import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher;
+import org.apache.eagle.jpm.analyzer.publisher.Publisher;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(MRJobPerformanceAnalyzer.class);
+
+    private List<Evaluator> evaluators = new ArrayList<>();
+    private List<Publisher> publishers = new ArrayList<>();
+
+    private Config config;
+
+    public MRJobPerformanceAnalyzer(Config config) {
+        this.config = config;
+        evaluators.add(new SLAJobEvaluator(config));
+        evaluators.add(new JobSuggestionEvaluator(config));
+
+        publishers.add(new EagleStorePublisher(config));
+        publishers.add(new EmailPublisher(config));
+    }
+
+    @Override
+    public void analysis(AnalyzerEntity analyzerJobEntity) throws Exception {
+        Result result = new Result();
+
+        for (Evaluator evaluator : evaluators) {
+            result.addEvaluatorResult(evaluator.getClass(), evaluator.evaluate(analyzerJobEntity));
+        }
+
+        for (Publisher publisher : publishers) {
+            publisher.publish(analyzerJobEntity, result);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
new file mode 100644
index 0000000..f10b68d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.mr.sla.processors.LongStuckJobProcessor;
+import org.apache.eagle.jpm.analyzer.mr.sla.processors.UnExpectedLongDurationJobProcessor;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SLAJobEvaluator implements Evaluator, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(SLAJobEvaluator.class);
+
+    private List<Processor> processors = new ArrayList<>();
+    private Config config;
+
+    public SLAJobEvaluator(Config config) {
+        this.config = config;
+        processors.add(new UnExpectedLongDurationJobProcessor(config));
+        processors.add(new LongStuckJobProcessor(config));
+    }
+
+    @Override
+    public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) {
+        Result.EvaluatorResult result = new Result.EvaluatorResult();
+
+        List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId());
+        if (jobMetaEntities.size() == 0
+                || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getName())) {
+            LOG.info("SLAJobEvaluator skip job {}", analyzerJobEntity.getJobDefId());
+            return result;
+        }
+
+        analyzerJobEntity.setJobMeta(jobMetaEntities.get(0).getConfiguration());
+
+        for (Processor processor : processors) {
+            result.addProcessorResult(processor.getClass(), processor.process(analyzerJobEntity));
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
new file mode 100644
index 0000000..35f3b27
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla.processors;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class LongStuckJobProcessor implements Processor, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(LongStuckJobProcessor.class);
+
+    private Config config;
+
+    public LongStuckJobProcessor(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
+        LOG.info("Job {} In LongStuckJobProcessor", analyzerJobEntity.getJobDefId());
+        return new Result.ProcessorResult(Result.ResultLevel.NONE, "");
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
new file mode 100644
index 0000000..9d4ce2b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla.processors;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.util.List;
+import java.util.Map;
+
+public class UnExpectedLongDurationJobProcessor implements Processor, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(UnExpectedLongDurationJobProcessor.class);
+
+    private Config config;
+
+    public UnExpectedLongDurationJobProcessor(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
+        LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId());
+
+        Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta();
+        long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData);
+        if (avgDurationTime == 0L) {
+            return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+        }
+
+        Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD;
+        if (jobMetaData.containsKey(Constants.ALERT_THRESHOLD_KEY)) {
+            alertThreshold = (Map<Result.ResultLevel, Double>)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY);
+        }
+        List<Map.Entry<Result.ResultLevel, Double>> sorted = Utils.sortByValue(alertThreshold);
+
+        double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime;
+        for (Map.Entry<Result.ResultLevel, Double> entry : sorted) {
+            if (expirePercent >= entry.getValue()) {
+                return new Result.ProcessorResult(entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
+                        (int)(expirePercent * 100), avgDurationTime / 1000));
+            }
+        }
+
+        return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+    }
+
+    private long getAvgDuration(AnalyzerEntity mrJobAnalysisEntity, Map<String, Object> jobMetaData) {
+        IEagleServiceClient client = new EagleServiceClientImpl(
+                config.getString(Constants.HOST_PATH),
+                config.getInt(Constants.PORT_PATH),
+                config.getString(Constants.USERNAME_PATH),
+                config.getString(Constants.PASSWORD_PATH));
+
+        client.setReadTimeout(config.getInt(Constants.READ_TIMEOUT_PATH) * 1000);
+
+        try {
+            int timeLength = Constants.DEFAULT_EVALUATOR_TIME_LENGTH;
+            try {
+                if (jobMetaData.containsKey(Constants.EVALUATOR_TIME_LENGTH_KEY)) {
+                    timeLength = Integer.parseInt(jobMetaData.get(Constants.EVALUATOR_TIME_LENGTH_KEY).toString());
+                }
+            } catch (Exception e) {
+                LOG.warn("exception found when parse timeLength {}, use default", e);
+            }
+
+            String query = String.format("%s[@site=\"%s\" and @jobDefId=\"%s\"]<@site>{avg(durationTime)}",
+                    org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME,
+                    mrJobAnalysisEntity.getSiteId(),
+                    URLEncoder.encode(mrJobAnalysisEntity.getJobDefId()));
+
+            GenericServiceAPIResponseEntity response = client
+                    .search(query)
+                    .startTime(System.currentTimeMillis() - (timeLength + 1) * 24 * 60 * 60000L)
+                    .endTime(System.currentTimeMillis() - 24 * 60 * 60000L)
+                    .pageSize(10)
+                    .send();
+
+            List<Map<List<String>, List<Double>>> results = response.getObj();
+            if (results.size() == 0) {
+                return 0L;
+            }
+            return results.get(0).get("value").get(0).longValue();
+        } catch (Exception e) {
+            LOG.warn("{}", e);
+            return 0L;
+        } finally {
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOG.warn("{}", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
new file mode 100644
index 0000000..79f5318
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class JobSuggestionEvaluator implements Evaluator, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionEvaluator.class);
+
+    private Config config;
+
+    public JobSuggestionEvaluator(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public Result.EvaluatorResult evaluate(AnalyzerEntity mrJobEntity) {
+        Result.EvaluatorResult result = new Result.EvaluatorResult();
+        //TODO
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
new file mode 100644
index 0000000..6109704
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class EagleStorePublisher implements Publisher, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(EagleStorePublisher.class);
+
+    private Config config;
+
+    public EagleStorePublisher(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
new file mode 100644
index 0000000..4e49094
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.app.service.ApplicationEmailService;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.mail.AlertEmailConstants;
+import org.apache.eagle.common.mail.AlertEmailContext;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EmailPublisher implements Publisher, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class);
+
+    private Config config;
+    private AlertDeduplicator alertDeduplicator;
+
+    public EmailPublisher(Config config) {
+        this.config = config;
+        this.alertDeduplicator = new SimpleDeduplicator();
+    }
+
+    @Override
+    public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+        if (result.getAlertMessages().size() == 0) {
+            return;
+        }
+
+        LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId());
+        if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
+            LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
+            return;
+        }
+
+        Map<String, String> basic = new HashMap<>();
+        basic.put("site", analyzerJobEntity.getSiteId());
+        basic.put("name", analyzerJobEntity.getJobDefId());
+        basic.put("user", analyzerJobEntity.getUserId());
+        basic.put("status", analyzerJobEntity.getCurrentState());
+        basic.put("duration", analyzerJobEntity.getDurationTime() * 1.0 / 1000 + "s");
+        basic.put("start", DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getStartTime()));
+        basic.put("end", analyzerJobEntity.getEndTime() == 0
+                ? "0"
+                : DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getEndTime()));
+        basic.put("progress", analyzerJobEntity.getProgress() + "%");
+        basic.put("detail", getJobLink(analyzerJobEntity));
+
+
+        Map<String, Map<String, String>> extend = new HashMap<>();
+        Map<String, List<Pair<Result.ResultLevel, String>>> alertMessages = result.getAlertMessages();
+        for (String evaluator : alertMessages.keySet()) {
+            List<Pair<Result.ResultLevel, String>> messages = alertMessages.get(evaluator);
+            extend.put(evaluator, new HashMap<>());
+            for (Pair<Result.ResultLevel, String> message : messages) {
+                LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]",
+                        analyzerJobEntity.getJobDefId(), message.getRight(), message.getLeft(), evaluator);
+                extend.get(evaluator).put(message.getRight(), message.getLeft().toString());
+            }
+        }
+
+        Map<String, Object> alertData = new HashMap<>();
+        alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic);
+        alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend);
+
+        //TODO, override email config in job meta data
+        ApplicationEmailService emailService = new ApplicationEmailService(config, Constants.ANALYZER_REPORT_CONFIG_PATH);
+        String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId());
+        AlertEmailContext alertContext = emailService.buildEmailContext(subject);
+        emailService.onAlert(alertContext, alertData);
+    }
+
+    private String getJobLink(AnalyzerEntity analyzerJobEntity) {
+        return "http://"
+                + config.getString(Constants.HOST_PATH)
+                + ":"
+                + config.getInt(Constants.PORT_PATH)
+                + "/#/site/"
+                + analyzerJobEntity.getSiteId()
+                + "/jpm/detail/"
+                + analyzerJobEntity.getJobId();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
new file mode 100644
index 0000000..2f42bf9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+
+public interface Publisher {
+    void publish(AnalyzerEntity analyzerJobEntity, Result result);
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
new file mode 100644
index 0000000..a12f589
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Result {
+    //for EagleStorePublisher
+    private TaggedLogAPIEntity alertEntity = null;//TODO
+    //for EmailPublisher
+    private Map<String, List<Pair<ResultLevel, String>>> alertMessages = new HashMap<>();
+
+    public void addEvaluatorResult(Class<?> type, EvaluatorResult result) {
+        Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults();
+        for (Class<?> processorType : processorResults.keySet()) {
+            ProcessorResult processorResult = processorResults.get(processorType);
+            if (processorResult.resultLevel.equals(ResultLevel.NONE)) {
+                continue;
+            }
+
+            String typeName = type.getName();
+            if (!alertMessages.containsKey(typeName)) {
+                alertMessages.put(typeName, new ArrayList<>());
+            }
+            alertMessages.get(typeName).add(Pair.of(processorResult.getResultLevel(), processorResult.getMessage()));
+        }
+    }
+
+    public TaggedLogAPIEntity getAlertEntity() {
+        return alertEntity;
+    }
+
+    public Map<String, List<Pair<ResultLevel, String>>> getAlertMessages() {
+        return alertMessages;
+    }
+
+    /**
+     * Processor result.
+     */
+
+    public enum ResultLevel {
+        NONE,
+        NOTICE,
+        WARNING,
+        CRITICAL
+    }
+
+    public static class ProcessorResult {
+        private ResultLevel resultLevel;
+        private String message;
+
+        public ProcessorResult(ResultLevel resultLevel, String message) {
+            this.resultLevel = resultLevel;
+            this.message = message;
+        }
+
+        public ResultLevel getResultLevel() {
+            return resultLevel;
+        }
+
+        public void setResultLevel(ResultLevel resultLevel) {
+            this.resultLevel = resultLevel;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+
+        public void setMessage(String message) {
+            this.message = message;
+        }
+    }
+
+    /**
+     * Evaluator result.
+     */
+    public static class EvaluatorResult {
+        private Map<Class<?>, ProcessorResult> processorResults = new HashMap<>();
+
+        public void addProcessorResult(Class<?> type, ProcessorResult result) {
+            this.processorResults.put(type, result);
+        }
+
+        public Map<Class<?>, ProcessorResult> getProcessorResults() {
+            return this.processorResults;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
new file mode 100644
index 0000000..4b18f7c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher.dedup;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface AlertDeduplicator {
+    boolean dedup(AnalyzerEntity analyzerJobEntity, Result result);
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
new file mode 100644
index 0000000..09f1af6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher.dedup.impl;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * dedup by jobDefId.
+ */
+public class SimpleDeduplicator implements AlertDeduplicator, Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleDeduplicator.class);
+
+    private Map<String, Long> lastUpdateTime = new HashMap<>();
+
+    @Override
+    public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) {
+        long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL;
+        if (analyzerJobEntity.getJobMeta().containsKey(Constants.DEDUP_INTERVAL_KEY)) {
+            dedupInterval = (Long)analyzerJobEntity.getJobMeta().get(Constants.DEDUP_INTERVAL_KEY);
+        }
+
+        dedupInterval = dedupInterval * 1000;
+        long currentTimeStamp = System.currentTimeMillis();
+        if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) {
+            if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) {
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp);
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
new file mode 100644
index 0000000..dc09202
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.resource;
+
+import com.google.inject.Inject;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.apache.eagle.metadata.resource.RESTResponse;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.util.List;
+
+import static org.apache.eagle.jpm.analyzer.util.Constants.*;
+
+@Path(ANALYZER_PATH)
+public class AnalyzerResource {
+    @Inject
+    MetaManagementService metaManagementService;
+
+    public AnalyzerResource() {
+    }
+
+    @POST
+    @Path(META_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> addJobMeta(JobMetaEntity jobMetaEntity) {
+        return RESTResponse.<Void>async((response) -> {
+            jobMetaEntity.ensureDefault();
+            boolean ret = metaManagementService.addJobMeta(jobMetaEntity);
+            String message = "Successfully add job meta for " + jobMetaEntity.getJobDefId();
+            if (!ret) {
+                message = "Failed to add job meta for " + jobMetaEntity.getJobDefId();
+            }
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @POST
+    @Path(JOB_META_PATH)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> updateJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId, JobMetaEntity jobMetaEntity) {
+        return RESTResponse.<Void>async((response) -> {
+            jobMetaEntity.ensureDefault();
+            boolean ret = metaManagementService.updateJobMeta(jobDefId, jobMetaEntity);
+            String message = "Successfully update job meta for " + jobDefId;
+            if (!ret) {
+                message = "Failed to update job meta for " + jobDefId;
+            }
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @GET
+    @Path(JOB_META_PATH)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<List<JobMetaEntity>> getJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
+        return RESTResponse.async(() -> metaManagementService.getJobMeta(jobDefId)).get();
+    }
+
+    @DELETE
+    @Path(JOB_META_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> deleteJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
+        return RESTResponse.<Void>async((response) -> {
+            boolean ret = metaManagementService.deleteJobMeta(jobDefId);
+            String message = "Successfully delete job meta for " + jobDefId;
+            if (!ret) {
+                message = "Failed to delete job meta for " + jobDefId;
+            }
+
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @POST
+    @Path(PUBLISHER_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> addPublisherMeta(PublisherEntity publisherEntity) {
+        return RESTResponse.<Void>async((response) -> {
+            publisherEntity.ensureDefault();
+            boolean ret = metaManagementService.addPublisherMeta(publisherEntity);
+            String message = "Successfully add publisher meta for " + publisherEntity.getUserId();
+            if (!ret) {
+                message = "Failed to add publisher meta for " + publisherEntity.getUserId();
+            }
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @DELETE
+    @Path(PUBLISHER_META_PATH)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<Void> deletePublisherMeta(@PathParam(USER_PATH) String userId) {
+        return RESTResponse.<Void>async((response) -> {
+            boolean ret = metaManagementService.deletePublisherMeta(userId);
+            String message = "Successfully delete publisher meta for " + userId;
+            if (!ret) {
+                message = "Failed to delete publisher meta for " + userId;
+            }
+            response.success(ret).message(message);
+        }).get();
+    }
+
+    @GET
+    @Path(PUBLISHER_META_PATH)
+    @Produces(MediaType.APPLICATION_JSON)
+    public RESTResponse<List<PublisherEntity>> getPublisherMeta(@PathParam(USER_PATH) String userId) {
+        return RESTResponse.async(() -> metaManagementService.getPublisherMeta(userId)).get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
new file mode 100644
index 0000000..774e6d2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.analyzer.util;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Constants {
+    public static final String HOST_PATH = "service.host";
+    public static final String PORT_PATH = "service.port";
+    public static final String USERNAME_PATH = "service.username";
+    public static final String PASSWORD_PATH = "service.password";
+    public static final String CONTEXT_PATH = "service.context";
+    public static final String READ_TIMEOUT_PATH = "service.readTimeOutSeconds";
+
+    public static final String META_PATH = "/metadata";
+    public static final String ANALYZER_PATH = "/job/analyzer";
+    public static final String JOB_DEF_PATH = "jobDefId";
+    public static final String JOB_META_PATH = META_PATH + "/{" + JOB_DEF_PATH + "}";
+
+    public static final String PUBLISHER_PATH = "/publisher";
+    public static final String USER_PATH = "userId";
+    public static final String PUBLISHER_META_PATH = PUBLISHER_PATH + "/{" + USER_PATH + "}";
+
+    public static final String PROCESS_NONE = "PROCESS_NONE";
+
+    public static final String EVALUATOR_TIME_LENGTH_KEY = "evaluator.timeLength";
+    public static final int DEFAULT_EVALUATOR_TIME_LENGTH = 7;//7 days
+
+    public static final String ALERT_THRESHOLD_KEY = "alert.threshold";
+    public static final Map<Result.ResultLevel, Double> DEFAULT_ALERT_THRESHOLD = new HashMap<Result.ResultLevel, Double>() {
+        {
+            put(Result.ResultLevel.NOTICE, 0.1);
+            put(Result.ResultLevel.WARNING, 0.3);
+            put(Result.ResultLevel.CRITICAL, 0.5);
+        }
+    };
+
+    public static final String DEDUP_INTERVAL_KEY = "alert.dedupInterval"; //seconds
+    public static final int DEFAULT_DEDUP_INTERVAL = 300;
+
+    public static final String ANALYZER_REPORT_CONFIG_PATH = "application.analyzerReport";
+    public static final String ANALYZER_REPORT_SUBJECT = "Job Performance Alert For Job: %s";
+
+    public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic";
+    public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend";
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
new file mode 100644
index 0000000..66f7622
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.analyzer.util;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.metadata.resource.RESTResponse;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.net.URLEncoder;
+import java.util.*;
+
+public class Utils {
+    private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+    }
+
+    public static List<JobMetaEntity> getJobMeta(Config config, String jobDefId) {
+        List<JobMetaEntity> result = new ArrayList<>();
+        String url = "http://"
+                + config.getString(Constants.HOST_PATH)
+                + ":"
+                + config.getInt(Constants.PORT_PATH)
+                + config.getString(Constants.CONTEXT_PATH)
+                + Constants.ANALYZER_PATH
+                + Constants.META_PATH
+                + "/"
+                + URLEncoder.encode(jobDefId);
+
+        InputStream is = null;
+        try {
+            is = InputStreamUtils.getInputStream(url, null, org.apache.eagle.jpm.util.Constants.CompressionType.NONE);
+            LOG.info("get job meta from {}", url);
+            result = (List<JobMetaEntity>)OBJ_MAPPER.readValue(is, RESTResponse.class).getData();
+        } catch (Exception e) {
+            LOG.warn("failed to get job meta from {}", url, e);
+        } finally {
+            org.apache.eagle.jpm.util.Utils.closeInputStream(is);
+            return result;
+        }
+    }
+
+    public static <K, V extends Comparable<? super V>> List<Map.Entry<K, V>> sortByValue(Map<K, V> map) {
+        List<Map.Entry<K, V>> list = new LinkedList<>(map.entrySet());
+        Collections.sort(list, (e1, e2) -> e1.getValue().compareTo(e2.getValue()));
+        Collections.reverse(list);
+        return list;
+    }
+}