You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/01/12 02:23:29 UTC
[1/2] eagle git commit: [EAGLE-797] add job performance analysis
Repository: eagle
Updated Branches:
refs/heads/master 2adbbf59f -> b4695801f
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
new file mode 100644
index 0000000..39cec68
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm
@@ -0,0 +1,131 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+<head>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
+ <meta name="viewport" content="width=device-width"/>
+ <style type="text/css">
+ img {
+ max-width: 100%;
+ }
+
+ body {
+ -webkit-font-smoothing: antialiased;
+ -webkit-text-size-adjust: none;
+ width: 100% !important;
+ height: 100%;
+ line-height: 1.6em;
+ }
+
+ body {
+ background-color: #f6f6f6;
+ }
+
+ @media only screen and (max-width: 640px) {
+ body {
+ padding: 0 !important;
+ }
+
+ h1 {
+ font-weight: 800 !important;
+ margin: 20px 0 5px !important;
+ }
+
+ h2 {
+ font-weight: 800 !important;
+ margin: 20px 0 5px !important;
+ }
+
+ h3 {
+ font-weight: 800 !important;
+ margin: 20px 0 5px !important;
+ }
+
+ h4 {
+ font-weight: 800 !important;
+ margin: 20px 0 5px !important;
+ }
+
+ h1 {
+ font-size: 22px !important;
+ }
+
+ h2 {
+ font-size: 18px !important;
+ }
+
+ h3 {
+ font-size: 14px !important;
+ }
+
+ .container {
+ padding: 0 !important;
+ width: 100% !important;
+ }
+
+ .content {
+ padding: 0 !important;
+ }
+
+ .content-wrap {
+ padding: 10px !important;
+ }
+
+ .invoice {
+ width: 100% !important;
+ }
+ }
+ </style>
+</head>
+<body>
+ #set ( $elem = $alertList[0] )
+
+<p><b>Basic Information: </b></p>
+
+<ul>
+ <li>Site: ${elem["basic"].get("site")}</li>
+ <li>Job Name: ${elem["basic"].get("name")}</li>
+ <li>User: ${elem["basic"].get("user")}</li>
+ <li>Job Status: ${elem["basic"].get("status")}</li>
+ <li>Start Time: ${elem["basic"].get("start")}</li>
+ <li>End Time: ${elem["basic"].get("end")}</li>
+ <li>Duration Time: ${elem["basic"].get("duration")}</li>
+ <li>Progress: ${elem["basic"].get("progress")}</li>
+ <li>Job Detail: <a href=${elem["basic"].get("detail")}>${elem["basic"].get("detail")}</a></li>
+</ul>
+
+<p><b>Analyzer Results: </b></p>
+
+#foreach($evaluator in ${elem["extend"].keySet()})
+<table class="body-wrap" style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; width: 100%; background-color: #f6f6f6; margin: 0;" bgcolor="#f6f6f6" border="1">
+ <caption><b>Analysis By $evaluator</b></caption>
+ <tr>
+ <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></td>
+ <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="250"><b>message</b></th>
+ </tr>
+ #foreach($message in ${elem["extend"].get($evaluator).keySet()})
+ <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+ <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;"><b>${elem["extend"].get($evaluator).get($message)}</b></td>
+ <th style="...">$message</th>
+ </tr>
+ #end
+</table>
+#end
+
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql
new file mode 100644
index 0000000..3d790d0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql
@@ -0,0 +1,23 @@
+CREATE TABLE IF NOT EXISTS jobs (
+ jobDefId VARCHAR(50) NOT NULL,
+ configuration MEDIUMTEXT NOT NULL,
+ createdtime bigint(20) DEFAULT NULL,
+ modifiedtime bigint(20) DEFAULT NULL,
+ PRIMARY KEY (jobDefId)
+);
+
+CREATE TABLE IF NOT EXISTS job_evaluators (
+ jobDefId VARCHAR(50) NOT NULL,
+ evaluator VARCHAR(100) NOT NULL,
+ createdtime bigint(20) DEFAULT NULL,
+ modifiedtime bigint(20) DEFAULT NULL,
+ PRIMARY KEY (jobDefId, evaluator)
+);
+
+CREATE TABLE IF NOT EXISTS job_publishments (
+ userId VARCHAR(100) PRIMARY KEY,
+ mailAddress mediumtext NOT NULL,
+ createdtime bigint(20) DEFAULT NULL,
+ modifiedtime bigint(20) DEFAULT NULL,
+ PRIMARY KEY (userId)
+);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
index 6f337e7..c3d08d4 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
@@ -89,7 +89,6 @@ public class MRHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBa
} catch (Exception e) {
return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
} finally {
- client.getJerseyClient().destroy();
try {
client.close();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 6dc5791..01f98c0 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -84,7 +84,6 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
} finally {
list.clear();
jobConfigurationEntity = null;
- client.getJerseyClient().destroy();
client.close();
}
tried++;
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 80b4049..2f77456 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -97,7 +97,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
eagleServiceConfig.username,
eagleServiceConfig.password);
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
logger.info("start flushing entities of total number " + list.size());
List<GenericMetricEntity> metricEntities = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
@@ -167,7 +167,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr
logger.info("finish flushing entities of total number " + list.size());
list.clear();
- client.getJerseyClient().destroy();
client.close();
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
index 666b3db..856f051 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java
@@ -120,7 +120,7 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
eagleServiceConfig.username,
eagleServiceConfig.password);
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
List<TaskAttemptCounterAPIEntity> list = new ArrayList<>();
logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size());
// create entity
@@ -149,7 +149,6 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe
logger.info("end flushing TaskAttemptCounter entities of total number " + counters.size());
counters.clear();
list.clear();
- client.getJerseyClient().destroy();
client.close();
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 40e6432..794f992 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -139,7 +139,6 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
}
tried++;
}
- client.getJerseyClient().destroy();
client.close();
}
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml
index f6f1be5..02f2bc4 100644
--- a/eagle-jpm/eagle-jpm-mr-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -79,6 +79,16 @@
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-analyzer</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-metadata-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index de0d846..309146e 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.running;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt;
import org.apache.eagle.jpm.util.Constants;
@@ -67,7 +68,8 @@ public class MRRunningJobApplication extends StormApplication {
mrRunningJobConfig.getEndpointConfig(),
mrRunningJobConfig.getZkStateConfig(),
confKeyKeys,
- config),
+ config,
+ new MRJobPerformanceAnalyzer(config)),
tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId"));
return topologyBuilder.createTopology();
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
index 5a57aca..6670282 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java
@@ -18,6 +18,11 @@ package org.apache.eagle.jpm.mr.running;
import org.apache.eagle.app.service.ApplicationListener;
import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceJDBCImpl;
+import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceMemoryImpl;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
+import org.apache.eagle.metadata.store.jdbc.JDBCMetadataStore;
import java.util.Optional;
@@ -31,4 +36,10 @@ public class MRRunningJobApplicationProvider extends AbstractApplicationProvider
public Optional<ApplicationListener> getApplicationListener() {
return Optional.of(new MRRunningJobApplicationListener());
}
+
+ @Override
+ protected void onRegister() {
+ bind(MemoryMetadataStore.class, MetaManagementService.class, MetaManagementServiceMemoryImpl.class);
+ bind(JDBCMetadataStore.class, MetaManagementService.class, MetaManagementServiceJDBCImpl.class);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index c2cbbe5..2bc1648 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -85,7 +85,6 @@ public class MRJobEntityCreationHandler {
LOG.warn("exception found when flush entities, {}", e);
return false;
} finally {
- client.getJerseyClient().destroy();
try {
client.close();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 52c1866..c21eaf1 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -19,6 +19,8 @@
package org.apache.eagle.jpm.mr.running.parser;
import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.runningentity.JobConfig;
@@ -82,6 +84,7 @@ public class MRJobParser implements Runnable {
private static final int FLUSH_TASKS_EVERY_TIME = 5;
private static final int MAX_TASKS_PERMIT = 5000;
private Config config;
+ private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer;
static {
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
@@ -92,7 +95,8 @@ public class MRJobParser implements Runnable {
AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap,
MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher,
List<String> configKeys,
- Config config) {
+ Config config,
+ MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) {
this.app = app;
if (mrJobMap == null) {
this.mrJobEntityMap = new HashMap<>();
@@ -112,6 +116,7 @@ public class MRJobParser implements Runnable {
this.finishedTaskIds = new HashSet<>();
this.configKeys = configKeys;
this.config = config;
+ this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer;
}
public void setAppInfo(AppInfo app) {
@@ -168,6 +173,7 @@ public class MRJobParser implements Runnable {
break;
}
}
+ mrJobPerformanceAnalyzer.analysis(convertToAnalysisEntity(mrJobEntityMap.get(jobId)));
}
}
@@ -586,4 +592,21 @@ public class MRJobParser implements Runnable {
}
}
}
+
+ private AnalyzerEntity convertToAnalysisEntity(JobExecutionAPIEntity jobExecutionAPIEntity) {
+ AnalyzerEntity mrJobAnalysisEntity = new AnalyzerEntity();
+ Map<String, String> tags = jobExecutionAPIEntity.getTags();
+ mrJobAnalysisEntity.setJobDefId(tags.get(MRJobTagName.JOD_DEF_ID.toString()));
+ mrJobAnalysisEntity.setJobId(tags.get(MRJobTagName.JOB_ID.toString()));
+ mrJobAnalysisEntity.setSiteId(tags.get(MRJobTagName.SITE.toString()));
+ mrJobAnalysisEntity.setUserId(tags.get(MRJobTagName.USER.toString()));
+
+ mrJobAnalysisEntity.setStartTime(jobExecutionAPIEntity.getStartTime());
+ mrJobAnalysisEntity.setEndTime(jobExecutionAPIEntity.getEndTime());
+ mrJobAnalysisEntity.setDurationTime(jobExecutionAPIEntity.getDurationTime());
+ mrJobAnalysisEntity.setCurrentState(jobExecutionAPIEntity.getInternalState());
+ mrJobAnalysisEntity.setJobConfig(new HashMap<>(jobExecutionAPIEntity.getJobConfig()));
+ mrJobAnalysisEntity.setProgress(this.app.getProgress());
+ return mrJobAnalysisEntity;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 8ec2dec..915df8a 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -19,6 +19,7 @@
package org.apache.eagle.jpm.mr.running.storm;
import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
@@ -51,18 +52,21 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
private ResourceFetcher resourceFetcher;
private List<String> configKeys;
private Config config;
+ private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer;
public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig,
MRRunningJobConfig.EndpointConfig endpointConfig,
MRRunningJobConfig.ZKStateConfig zkStateConfig,
List<String> configKeys,
- Config config) {
+ Config config,
+ MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) {
this.eagleServiceConfig = eagleServiceConfig;
this.endpointConfig = endpointConfig;
this.runningMRParsers = new HashMap<>();
this.zkStateConfig = zkStateConfig;
this.configKeys = configKeys;
this.config = config;
+ this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer;
}
@Override
@@ -83,7 +87,8 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
MRJobParser applicationParser;
if (!runningMRParsers.containsKey(appInfo.getId())) {
applicationParser = new MRJobParser(endpointConfig, eagleServiceConfig,
- appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config);
+ appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config,
+ mrJobPerformanceAnalyzer);
runningMRParsers.put(appInfo.getId(), applicationParser);
LOG.info("create application parser for {}", appInfo.getId());
} else {
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
index 787c9ac..fc674d6 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java
@@ -23,6 +23,7 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
import org.apache.eagle.jpm.mr.running.parser.MRJobParser;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout;
@@ -88,7 +89,8 @@ public class MRRunningJobApplicationTest {
mrRunningJobConfig.getEndpointConfig(),
mrRunningJobConfig.getZkStateConfig(),
confKeyKeys,
- config);
+ config,
+ new MRJobPerformanceAnalyzer(config));
MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class);
PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager);
mrRunningJobParseBolt.prepare(null, null, null);
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index e0b5533..7046f8b 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
+import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer;
import org.apache.eagle.jpm.mr.running.MRRunningJobConfig;
import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener;
import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager;
@@ -131,7 +132,7 @@ public class MRJobParserTest {
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+ app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -186,7 +187,7 @@ public class MRJobParserTest {
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+ app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -228,7 +229,7 @@ public class MRJobParserTest {
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+ app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -272,7 +273,7 @@ public class MRJobParserTest {
MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig());
RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls);
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+ app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -318,7 +319,7 @@ public class MRJobParserTest {
RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+ app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -357,7 +358,7 @@ public class MRJobParserTest {
eagleServiceConfig.username,
eagleServiceConfig.password).thenReturn(client);
when(client.create(any())).thenThrow(Exception.class).thenReturn(null);
- when(client.getJerseyClient()).thenReturn(new Client());
+ //when(client.getJerseyClient()).thenReturn(new Client());
mockInputJobSteam("/mrjob_30784.json", JOB_URL);
mockInputJobSteamWithException(JOB_COUNT_URL);
mockGetConnection("/mrconf_30784.xml");
@@ -377,7 +378,7 @@ public class MRJobParserTest {
RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class);
when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList());
MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(),
- app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config);
+ app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config));
Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser);
@@ -401,7 +402,7 @@ public class MRJobParserTest {
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null);
Assert.assertTrue(entities.isEmpty());
verify(client, times(2)).create(any());
- verify(client, times(1)).getJerseyClient();
+ //verify(client, times(1)).getJerseyClient();
verify(client, times(1)).close();
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
index fdfcaad..8127aa2 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java
@@ -50,7 +50,7 @@ public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthChec
eagleServiceConfig.username,
eagleServiceConfig.password);
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000);
+ client.setReadTimeout(eagleServiceConfig.timeout * 1000);
String message = "";
try {
@@ -88,7 +88,6 @@ public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthChec
} catch (Exception e) {
return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
} finally {
- client.getJerseyClient().destroy();
try {
client.close();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
index 05e35e4..2ef1bd9 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -708,7 +708,7 @@ public class JHFSparkEventReader {
config.eagleInfo.username,
config.eagleInfo.password);
int timeout = config.eagleInfo.timeout;
- client.getJerseyClient().setReadTimeout(timeout * 1000);
+ client.setReadTimeout(timeout * 1000);
return client;
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
index 92adfa8..adef27f 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java
@@ -57,7 +57,7 @@ public class SparkAppEntityCreationHandler {
eagleServiceConfig.eagleServicePort,
eagleServiceConfig.username,
eagleServiceConfig.password)) {
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
LOG.info("start to flush spark app entities, size {}", entities.size());
client.create(entities);
LOG.info("finish flushing spark app entities, size {}", entities.size());
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
index 9025d36..a065373 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java
@@ -39,7 +39,7 @@ public class Utils {
try {
is.close();
} catch (Exception e) {
- e.printStackTrace();
+ LOG.warn("{}", e);
}
}
}
@@ -48,7 +48,7 @@ public class Utils {
try {
Thread.sleep(seconds * 1000);
} catch (Exception e) {
- e.printStackTrace();
+ LOG.warn("{}", e);
}
}
@@ -60,7 +60,7 @@ public class Utils {
Date parsedDate = dateFormat.parse(date);
timestamp = parsedDate.getTime();
} catch (ParseException e) {
- e.printStackTrace();
+ LOG.warn("{}", e);
}
if (timestamp == 0L) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/pom.xml b/eagle-jpm/pom.xml
index 1ffdb03..e7ae3c3 100644
--- a/eagle-jpm/pom.xml
+++ b/eagle-jpm/pom.xml
@@ -32,6 +32,7 @@
<packaging>pom</packaging>
<modules>
+ <module>eagle-jpm-analyzer</module>
<module>eagle-jpm-spark-running</module>
<module>eagle-jpm-spark-history</module>
<module>eagle-jpm-mr-history</module>
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-server-assembly/src/main/conf/eagle.conf
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf
index af7e14a..a889914 100644
--- a/eagle-server-assembly/src/main/conf/eagle.conf
+++ b/eagle-server-assembly/src/main/conf/eagle.conf
@@ -108,6 +108,12 @@ application {
recipients: "nobody@abc.com"
template: "JobReportTemplate.vm"
}
+ analyzerReport {
+ sender: "nobody@abc.com"
+ recipients: "nobody@abc.com"
+ template: "AnalyzerReportTemplate.vm"
+ cc: "nobody@abc.com"
+ }
}
# ---------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
index bf5e695..950bb04 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java
@@ -50,7 +50,7 @@ public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckB
topologyCheckAppConfig.getConfig().getString("service.username"),
topologyCheckAppConfig.getConfig().getString("service.password"));
- client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
+ client.setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000);
String message = "";
try {
@@ -80,7 +80,6 @@ public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckB
} catch (Exception e) {
return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
} finally {
- client.getJerseyClient().destroy();
try {
client.close();
} catch (Exception e) {
[2/2] eagle git commit: [EAGLE-797] add job performance analysis
Posted by qi...@apache.org.
[EAGLE-797] add job performance analysis
for continuous development
Author: wujinhu <wu...@126.com>
Closes #762 from wujinhu/EAGLE-797.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/b4695801
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/b4695801
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/b4695801
Branch: refs/heads/master
Commit: b4695801f6c93ca0991fd9758c470b784aa8c781
Parents: 2adbbf5
Author: wujinhu <wu...@126.com>
Authored: Thu Jan 12 10:23:02 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Jan 12 10:23:02 2017 +0800
----------------------------------------------------------------------
.../app/spi/AbstractApplicationProvider.java | 3 +-
...adoopQueueRunningApplicationHealthCheck.java | 3 +-
.../jpm/aggregation/mr/MRMetricAggregator.java | 3 +
.../mr/MRMetricsAggregateContainer.java | 1 +
eagle-jpm/eagle-jpm-analyzer/pom.xml | 38 ++++++
.../eagle/jpm/analyzer/AnalyzerEntity.java | 130 ++++++++++++++++++
.../apache/eagle/jpm/analyzer/Evaluator.java | 24 ++++
.../apache/eagle/jpm/analyzer/JobAnalyzer.java | 28 ++++
.../apache/eagle/jpm/analyzer/Processor.java | 24 ++++
.../analyzer/meta/MetaManagementService.java | 39 ++++++
.../impl/MetaManagementServiceJDBCImpl.java | 77 +++++++++++
.../impl/MetaManagementServiceMemoryImpl.java | 127 ++++++++++++++++++
.../jpm/analyzer/meta/model/JobMetaEntity.java | 85 ++++++++++++
.../analyzer/meta/model/PublisherEntity.java | 77 +++++++++++
.../analyzer/mr/MRJobPerformanceAnalyzer.java | 65 +++++++++
.../jpm/analyzer/mr/sla/SLAJobEvaluator.java | 67 ++++++++++
.../sla/processors/LongStuckJobProcessor.java | 43 ++++++
.../UnExpectedLongDurationJobProcessor.java | 120 +++++++++++++++++
.../mr/suggestion/JobSuggestionEvaluator.java | 44 +++++++
.../analyzer/publisher/EagleStorePublisher.java | 40 ++++++
.../jpm/analyzer/publisher/EmailPublisher.java | 109 +++++++++++++++
.../eagle/jpm/analyzer/publisher/Publisher.java | 24 ++++
.../eagle/jpm/analyzer/publisher/Result.java | 109 +++++++++++++++
.../publisher/dedup/AlertDeduplicator.java | 25 ++++
.../dedup/impl/SimpleDeduplicator.java | 59 +++++++++
.../jpm/analyzer/resource/AnalyzerResource.java | 131 +++++++++++++++++++
.../eagle/jpm/analyzer/util/Constants.java | 65 +++++++++
.../apache/eagle/jpm/analyzer/util/Utils.java | 74 +++++++++++
.../main/resources/AnalyzerReportTemplate.vm | 131 +++++++++++++++++++
.../src/main/resources/createTable.sql | 23 ++++
.../MRHistoryJobApplicationHealthCheck.java | 1 -
...JobConfigurationCreationServiceListener.java | 1 -
.../JobEntityCreationEagleServiceListener.java | 3 +-
.../parser/TaskAttemptCounterListener.java | 3 +-
.../mr/history/parser/TaskFailureListener.java | 1 -
eagle-jpm/eagle-jpm-mr-running/pom.xml | 10 ++
.../jpm/mr/running/MRRunningJobApplication.java | 4 +-
.../MRRunningJobApplicationProvider.java | 11 ++
.../parser/MRJobEntityCreationHandler.java | 1 -
.../jpm/mr/running/parser/MRJobParser.java | 25 +++-
.../mr/running/storm/MRRunningJobParseBolt.java | 9 +-
.../mr/running/MRRunningJobApplicationTest.java | 4 +-
.../jpm/mr/running/parser/MRJobParserTest.java | 17 +--
.../SparkHistoryJobApplicationHealthCheck.java | 3 +-
.../history/crawl/JHFSparkEventReader.java | 2 +-
.../parser/SparkAppEntityCreationHandler.java | 2 +-
.../java/org/apache/eagle/jpm/util/Utils.java | 6 +-
eagle-jpm/pom.xml | 1 +
eagle-server-assembly/src/main/conf/eagle.conf | 6 +
.../TopologyCheckApplicationHealthCheck.java | 3 +-
50 files changed, 1868 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 2a8d7c0..e537643 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -19,6 +19,7 @@ package org.apache.eagle.app.spi;
import com.google.common.base.Preconditions;
import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.app.Application;
import org.apache.eagle.app.service.ApplicationListener;
@@ -130,7 +131,7 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
currentRegistry.register(scope, new AbstractModule() {
@Override
protected void configure() {
- bind(type).to(impl);
+ bind(type).to(impl).in(Singleton.class);
}
});
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
index 6471dfc..5a5d0ee 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
@@ -52,7 +52,7 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC
eagleServiceConfig.eagleService.username,
eagleServiceConfig.eagleService.password);
- client.getJerseyClient().setReadTimeout(60000);
+ client.setReadTimeout(60000);
String message = "";
try {
@@ -91,7 +91,6 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC
} catch (Exception e) {
return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
} finally {
- client.getJerseyClient().destroy();
try {
client.close();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
index f8840b2..54bd29b 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
@@ -74,6 +74,8 @@ public class MRMetricAggregator implements MetricAggregator, Serializable {
.endTime(endTime)
.pageSize(Integer.MAX_VALUE)
.send();
+
+ client.close();
} catch (Exception e) {
LOG.warn("{}", e);
return false;
@@ -151,6 +153,7 @@ public class MRMetricAggregator implements MetricAggregator, Serializable {
client.create(entities);
LOG.info("finish flushing entities of total number " + entities.size());
entities.clear();
+ client.close();
} catch (Exception e) {
LOG.warn("{}", e);
return false;
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
index 45bbcef..dd1980b 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
@@ -74,6 +74,7 @@ public class MRMetricsAggregateContainer implements MetricsAggregateContainer, S
List<Map<List<String>, List<Double>>> results = response.getObj();
long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+ client.close();
return currentProcessTimeStamp;
} catch (Exception e) {
LOG.warn("{}", e);
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml
new file mode 100644
index 0000000..d6383b6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>eagle-jpm-parent</artifactId>
+ <groupId>org.apache.eagle</groupId>
+ <version>0.5.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>eagle-jpm-analyzer</artifactId>
+ <name>Eagle::App::JPM::Analyzer</name>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-metadata-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
new file mode 100644
index 0000000..f9b7af0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * will refactor later if other types of job needs this.
+ * AnalyzerEntity for each job needed to be analysised
+ */
+public class AnalyzerEntity {
+ private String jobDefId;
+ private String jobId;
+ private String siteId;
+ private String userId;
+
+ private long startTime;
+ private long endTime;
+ private long durationTime;
+ private String currentState;
+ private double progress;
+
+ private Map<String, Object> jobConfig = new HashMap<>();
+
+ private Map<String, Object> jobMeta = new HashMap<>();
+
+ public String getJobDefId() {
+ return jobDefId;
+ }
+
+ public void setJobDefId(String jobDefId) {
+ this.jobDefId = jobDefId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public long getDurationTime() {
+ return durationTime;
+ }
+
+ public void setDurationTime(long durationTime) {
+ this.durationTime = durationTime;
+ }
+
+ public String getCurrentState() {
+ return currentState;
+ }
+
+ public void setCurrentState(String currentState) {
+ this.currentState = currentState;
+ }
+
+ public Map<String, Object> getJobConfig() {
+ return jobConfig;
+ }
+
+ public void setJobConfig(Map<String, Object> jobConfig) {
+ this.jobConfig = jobConfig;
+ }
+
+ public Map<String, Object> getJobMeta() {
+ return jobMeta;
+ }
+
+ public void setJobMeta(Map<String, Object> jobMeta) {
+ this.jobMeta = jobMeta;
+ }
+
+ public double getProgress() {
+ return progress;
+ }
+
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
new file mode 100644
index 0000000..6617916
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface Evaluator {
+ Result.EvaluatorResult evaluate(AnalyzerEntity analyzerEntity);
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
new file mode 100644
index 0000000..6cda1cd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+/**
+ * Each JobAnalyzer contains one or more Evaluators to analysis each job.
+ * Each Evaluator is a group of Processors
+ * Each Processor implements an algorithm or a model to analysis one dimension of a job
+ *
+ */
+public interface JobAnalyzer {
+ void analysis(AnalyzerEntity analyzerEntity) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
new file mode 100644
index 0000000..d5a8a74
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface Processor {
+ Result.ProcessorResult process(AnalyzerEntity jobAnalysisEntity);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
new file mode 100644
index 0000000..0935266
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta;
+
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+
+import java.util.List;
+
+public interface MetaManagementService {
+ boolean addJobMeta(JobMetaEntity jobMetaEntity);
+
+ boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity);
+
+ List<JobMetaEntity> getJobMeta(String jobDefId);
+
+ boolean deleteJobMeta(String jobDefId);
+
+ boolean addPublisherMeta(PublisherEntity publisherEntity);
+
+ boolean deletePublisherMeta(String userId);
+
+ List<PublisherEntity> getPublisherMeta(String userId);
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
new file mode 100644
index 0000000..cfb5029
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class MetaManagementServiceJDBCImpl implements MetaManagementService, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceJDBCImpl.class);
+
+ @Inject
+ Config config;
+
+ @Override
+ public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
+
+ return true;
+ }
+
+ @Override
+ public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
+
+ return true;
+ }
+
+ @Override
+ public List<JobMetaEntity> getJobMeta(String jobDefId) {
+
+ return null;
+ }
+
+ @Override
+ public boolean deleteJobMeta(String jobDefId) {
+
+ return true;
+ }
+
+ @Override
+ public boolean addPublisherMeta(PublisherEntity publisherEntity) {
+
+ return true;
+ }
+
+ @Override
+ public boolean deletePublisherMeta(String userId) {
+
+ return true;
+ }
+
+ @Override
+ public List<PublisherEntity> getPublisherMeta(String userId) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
new file mode 100644
index 0000000..85e8358
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class MetaManagementServiceMemoryImpl implements MetaManagementService, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceMemoryImpl.class);
+
+ private final Map<String, JobMetaEntity> jobMetaEntities = new HashMap<>();
+ private final Map<String, List<PublisherEntity>> publisherEntities = new HashMap<>();
+
+ @Inject
+ Config config;
+
+ @Override
+ public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
+ if (jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
+ LOG.warn("contains job {} already, add job meta failed", jobMetaEntity.getJobDefId());
+ return false;
+ }
+
+ jobMetaEntities.put(jobMetaEntity.getJobDefId(), jobMetaEntity);
+ LOG.info("Successfully add job {} meta", jobMetaEntity.getJobDefId());
+ return true;
+ }
+
+ @Override
+ public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
+ if (!jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
+ LOG.warn("does not contain job {}, update job meta failed", jobDefId);
+ return false;
+ }
+
+ jobMetaEntities.put(jobDefId, jobMetaEntity);
+ LOG.info("Successfully update job {} meta", jobDefId);
+ return true;
+ }
+
+ @Override
+ public List<JobMetaEntity> getJobMeta(String jobDefId) {
+ if (!jobMetaEntities.containsKey(jobDefId)) {
+ LOG.warn("does not contain job {}, get job meta failed", jobDefId);
+ return new ArrayList<>();
+ }
+
+ return Arrays.asList(jobMetaEntities.get(jobDefId));
+ }
+
+ @Override
+ public boolean deleteJobMeta(String jobDefId) {
+ if (!jobMetaEntities.containsKey(jobDefId)) {
+ LOG.warn("does not contain job {}, delete job meta failed", jobDefId);
+ return false;
+ }
+
+ jobMetaEntities.remove(jobDefId);
+ LOG.info("Successfully delete job {} meta", jobDefId);
+ return true;
+ }
+
+ @Override
+ public boolean addPublisherMeta(PublisherEntity publisherEntity) {
+ if (publisherEntities.containsKey(publisherEntity.getUserId())) {
+ for (PublisherEntity entity : publisherEntities.get(publisherEntity.getUserId())) {
+ if (entity.equals(publisherEntity)) {
+ LOG.warn("contains user {}, mailAddress {} already, add publisher failed", entity.getUserId(), entity.getMailAddress());
+ return false;
+ }
+ }
+ }
+
+ if (!publisherEntities.containsKey(publisherEntity.getUserId())) {
+ publisherEntities.put(publisherEntity.getUserId(), new ArrayList<>());
+ }
+
+ publisherEntities.get(publisherEntity.getUserId()).add(publisherEntity);
+ LOG.info("Successfully add publisher user {}, mailAddress {}", publisherEntity.getUserId(), publisherEntity.getMailAddress());
+ return true;
+ }
+
+ @Override
+ public boolean deletePublisherMeta(String userId) {
+ if (!publisherEntities.containsKey(userId)) {
+ LOG.warn("does not contain user {}, failed to delete publisher", userId);
+ return false;
+ }
+
+ publisherEntities.remove(userId);
+ LOG.info("Successfully delete publisher user " + userId);
+ return true;
+ }
+
+ @Override
+ public List<PublisherEntity> getPublisherMeta(String userId) {
+ if (!publisherEntities.containsKey(userId)) {
+ LOG.warn("does not contain user {}, failed to get publisher", userId);
+ return new ArrayList<>();
+ }
+
+ return publisherEntities.get(userId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
new file mode 100644
index 0000000..2e15c17
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobMetaEntity extends PersistenceEntity {
+ private String jobDefId;
+ private String siteId;
+ private Map<String, Object> configuration = new HashMap<>();
+ private Set<String> evaluators = new HashSet<>();
+
+ public JobMetaEntity() {
+
+ }
+
+ public JobMetaEntity(String jobDefId,
+ String siteId,
+ Map<String, Object> configuration,
+ Set<String> evaluators) {
+ this.jobDefId = jobDefId;
+ this.siteId = siteId;
+ this.configuration = configuration;
+ this.evaluators = evaluators;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("JobMetaEntity[jobDefId=%s, siteId=%s]", jobDefId, siteId);
+ }
+
+ public String getJobDefId() {
+ return jobDefId;
+ }
+
+ public void setJobDefId(String jobDefId) {
+ this.jobDefId = jobDefId;
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
+ public Map<String, Object> getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(Map<String, Object> configuration) {
+ this.configuration = configuration;
+ }
+
+ public Set<String> getEvaluators() {
+ return evaluators;
+ }
+
+ public void setEvaluators(Set<String> evaluators) {
+ this.evaluators = evaluators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
new file mode 100644
index 0000000..bca7ab1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PublisherEntity extends PersistenceEntity {
+ private String userId;
+ private String mailAddress;
+
+ public PublisherEntity(String userId, String mailAddress) {
+ this.userId = userId;
+ this.mailAddress = mailAddress;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("PublisherEntity[userId=%s, mailAddress=%s]", userId, mailAddress);
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public String getMailAddress() {
+ return mailAddress;
+ }
+
+ public void setMailAddress(String mailAddress) {
+ this.mailAddress = mailAddress;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(userId)
+ .append(mailAddress)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+
+ if (!(that instanceof PublisherEntity)) {
+ return false;
+ }
+
+ PublisherEntity another = (PublisherEntity)that;
+
+ return another.userId.equals(this.userId) && another.mailAddress.equals(this.mailAddress);
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
new file mode 100644
index 0000000..e0e579a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.*;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
+import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
+import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher;
+import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher;
+import org.apache.eagle.jpm.analyzer.publisher.Publisher;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MRJobPerformanceAnalyzer.class);
+
+ private List<Evaluator> evaluators = new ArrayList<>();
+ private List<Publisher> publishers = new ArrayList<>();
+
+ private Config config;
+
+ public MRJobPerformanceAnalyzer(Config config) {
+ this.config = config;
+ evaluators.add(new SLAJobEvaluator(config));
+ evaluators.add(new JobSuggestionEvaluator(config));
+
+ publishers.add(new EagleStorePublisher(config));
+ publishers.add(new EmailPublisher(config));
+ }
+
+ @Override
+ public void analysis(AnalyzerEntity analyzerJobEntity) throws Exception {
+ Result result = new Result();
+
+ for (Evaluator evaluator : evaluators) {
+ result.addEvaluatorResult(evaluator.getClass(), evaluator.evaluate(analyzerJobEntity));
+ }
+
+ for (Publisher publisher : publishers) {
+ publisher.publish(analyzerJobEntity, result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
new file mode 100644
index 0000000..f10b68d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.mr.sla.processors.LongStuckJobProcessor;
+import org.apache.eagle.jpm.analyzer.mr.sla.processors.UnExpectedLongDurationJobProcessor;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SLAJobEvaluator implements Evaluator, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(SLAJobEvaluator.class);
+
+ private List<Processor> processors = new ArrayList<>();
+ private Config config;
+
+ public SLAJobEvaluator(Config config) {
+ this.config = config;
+ processors.add(new UnExpectedLongDurationJobProcessor(config));
+ processors.add(new LongStuckJobProcessor(config));
+ }
+
+ @Override
+ public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) {
+ Result.EvaluatorResult result = new Result.EvaluatorResult();
+
+ List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId());
+ if (jobMetaEntities.size() == 0
+ || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getName())) {
+ LOG.info("SLAJobEvaluator skip job {}", analyzerJobEntity.getJobDefId());
+ return result;
+ }
+
+ analyzerJobEntity.setJobMeta(jobMetaEntities.get(0).getConfiguration());
+
+ for (Processor processor : processors) {
+ result.addProcessorResult(processor.getClass(), processor.process(analyzerJobEntity));
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
new file mode 100644
index 0000000..35f3b27
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla.processors;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class LongStuckJobProcessor implements Processor, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(LongStuckJobProcessor.class);
+
+ private Config config;
+
+ public LongStuckJobProcessor(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
+ LOG.info("Job {} In LongStuckJobProcessor", analyzerJobEntity.getJobDefId());
+ return new Result.ProcessorResult(Result.ResultLevel.NONE, "");
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
new file mode 100644
index 0000000..9d4ce2b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla.processors;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.util.List;
+import java.util.Map;
+
+public class UnExpectedLongDurationJobProcessor implements Processor, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(UnExpectedLongDurationJobProcessor.class);
+
+ private Config config;
+
+ public UnExpectedLongDurationJobProcessor(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
+ LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId());
+
+ Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta();
+ long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData);
+ if (avgDurationTime == 0L) {
+ return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+ }
+
+ Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD;
+ if (jobMetaData.containsKey(Constants.ALERT_THRESHOLD_KEY)) {
+ alertThreshold = (Map<Result.ResultLevel, Double>)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY);
+ }
+ List<Map.Entry<Result.ResultLevel, Double>> sorted = Utils.sortByValue(alertThreshold);
+
+ double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime;
+ for (Map.Entry<Result.ResultLevel, Double> entry : sorted) {
+ if (expirePercent >= entry.getValue()) {
+ return new Result.ProcessorResult(entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
+ (int)(expirePercent * 100), avgDurationTime / 1000));
+ }
+ }
+
+ return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+ }
+
+ private long getAvgDuration(AnalyzerEntity mrJobAnalysisEntity, Map<String, Object> jobMetaData) {
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ config.getString(Constants.HOST_PATH),
+ config.getInt(Constants.PORT_PATH),
+ config.getString(Constants.USERNAME_PATH),
+ config.getString(Constants.PASSWORD_PATH));
+
+ client.setReadTimeout(config.getInt(Constants.READ_TIMEOUT_PATH) * 1000);
+
+ try {
+ int timeLength = Constants.DEFAULT_EVALUATOR_TIME_LENGTH;
+ try {
+ if (jobMetaData.containsKey(Constants.EVALUATOR_TIME_LENGTH_KEY)) {
+ timeLength = Integer.parseInt(jobMetaData.get(Constants.EVALUATOR_TIME_LENGTH_KEY).toString());
+ }
+ } catch (Exception e) {
+ LOG.warn("exception found when parse timeLength {}, use default", e);
+ }
+
+ String query = String.format("%s[@site=\"%s\" and @jobDefId=\"%s\"]<@site>{avg(durationTime)}",
+ org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME,
+ mrJobAnalysisEntity.getSiteId(),
+ URLEncoder.encode(mrJobAnalysisEntity.getJobDefId()));
+
+ GenericServiceAPIResponseEntity response = client
+ .search(query)
+ .startTime(System.currentTimeMillis() - (timeLength + 1) * 24 * 60 * 60000L)
+ .endTime(System.currentTimeMillis() - 24 * 60 * 60000L)
+ .pageSize(10)
+ .send();
+
+ List<Map<List<String>, List<Double>>> results = response.getObj();
+ if (results.size() == 0) {
+ return 0L;
+ }
+ return results.get(0).get("value").get(0).longValue();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ return 0L;
+ } finally {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
new file mode 100644
index 0000000..79f5318
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class JobSuggestionEvaluator implements Evaluator, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionEvaluator.class);
+
+ private Config config;
+
+ public JobSuggestionEvaluator(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Result.EvaluatorResult evaluate(AnalyzerEntity mrJobEntity) {
+ Result.EvaluatorResult result = new Result.EvaluatorResult();
+ //TODO
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
new file mode 100644
index 0000000..6109704
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class EagleStorePublisher implements Publisher, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(EagleStorePublisher.class);
+
+ private Config config;
+
+ public EagleStorePublisher(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
new file mode 100644
index 0000000..4e49094
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.app.service.ApplicationEmailService;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.mail.AlertEmailConstants;
+import org.apache.eagle.common.mail.AlertEmailContext;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EmailPublisher implements Publisher, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class);
+
+ private Config config;
+ private AlertDeduplicator alertDeduplicator;
+
+ public EmailPublisher(Config config) {
+ this.config = config;
+ this.alertDeduplicator = new SimpleDeduplicator();
+ }
+
+ @Override
+ public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+ if (result.getAlertMessages().size() == 0) {
+ return;
+ }
+
+ LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId());
+ if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
+ LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
+ return;
+ }
+
+ Map<String, String> basic = new HashMap<>();
+ basic.put("site", analyzerJobEntity.getSiteId());
+ basic.put("name", analyzerJobEntity.getJobDefId());
+ basic.put("user", analyzerJobEntity.getUserId());
+ basic.put("status", analyzerJobEntity.getCurrentState());
+ basic.put("duration", analyzerJobEntity.getDurationTime() * 1.0 / 1000 + "s");
+ basic.put("start", DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getStartTime()));
+ basic.put("end", analyzerJobEntity.getEndTime() == 0
+ ? "0"
+ : DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getEndTime()));
+ basic.put("progress", analyzerJobEntity.getProgress() + "%");
+ basic.put("detail", getJobLink(analyzerJobEntity));
+
+
+ Map<String, Map<String, String>> extend = new HashMap<>();
+ Map<String, List<Pair<Result.ResultLevel, String>>> alertMessages = result.getAlertMessages();
+ for (String evaluator : alertMessages.keySet()) {
+ List<Pair<Result.ResultLevel, String>> messages = alertMessages.get(evaluator);
+ extend.put(evaluator, new HashMap<>());
+ for (Pair<Result.ResultLevel, String> message : messages) {
+ LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]",
+ analyzerJobEntity.getJobDefId(), message.getRight(), message.getLeft(), evaluator);
+ extend.get(evaluator).put(message.getRight(), message.getLeft().toString());
+ }
+ }
+
+ Map<String, Object> alertData = new HashMap<>();
+ alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic);
+ alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend);
+
+ //TODO, override email config in job meta data
+ ApplicationEmailService emailService = new ApplicationEmailService(config, Constants.ANALYZER_REPORT_CONFIG_PATH);
+ String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId());
+ AlertEmailContext alertContext = emailService.buildEmailContext(subject);
+ emailService.onAlert(alertContext, alertData);
+ }
+
+ private String getJobLink(AnalyzerEntity analyzerJobEntity) {
+ return "http://"
+ + config.getString(Constants.HOST_PATH)
+ + ":"
+ + config.getInt(Constants.PORT_PATH)
+ + "/#/site/"
+ + analyzerJobEntity.getSiteId()
+ + "/jpm/detail/"
+ + analyzerJobEntity.getJobId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
new file mode 100644
index 0000000..2f42bf9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+
+public interface Publisher {
+ void publish(AnalyzerEntity analyzerJobEntity, Result result);
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
new file mode 100644
index 0000000..a12f589
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Result {
+ //for EagleStorePublisher
+ private TaggedLogAPIEntity alertEntity = null;//TODO
+ //for EmailPublisher
+ private Map<String, List<Pair<ResultLevel, String>>> alertMessages = new HashMap<>();
+
+ public void addEvaluatorResult(Class<?> type, EvaluatorResult result) {
+ Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults();
+ for (Class<?> processorType : processorResults.keySet()) {
+ ProcessorResult processorResult = processorResults.get(processorType);
+ if (processorResult.resultLevel.equals(ResultLevel.NONE)) {
+ continue;
+ }
+
+ String typeName = type.getName();
+ if (!alertMessages.containsKey(typeName)) {
+ alertMessages.put(typeName, new ArrayList<>());
+ }
+ alertMessages.get(typeName).add(Pair.of(processorResult.getResultLevel(), processorResult.getMessage()));
+ }
+ }
+
+ public TaggedLogAPIEntity getAlertEntity() {
+ return alertEntity;
+ }
+
+ public Map<String, List<Pair<ResultLevel, String>>> getAlertMessages() {
+ return alertMessages;
+ }
+
+ /**
+ * Processor result.
+ */
+
+ public enum ResultLevel {
+ NONE,
+ NOTICE,
+ WARNING,
+ CRITICAL
+ }
+
+ public static class ProcessorResult {
+ private ResultLevel resultLevel;
+ private String message;
+
+ public ProcessorResult(ResultLevel resultLevel, String message) {
+ this.resultLevel = resultLevel;
+ this.message = message;
+ }
+
+ public ResultLevel getResultLevel() {
+ return resultLevel;
+ }
+
+ public void setResultLevel(ResultLevel resultLevel) {
+ this.resultLevel = resultLevel;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+
+ /**
+ * Evaluator result.
+ */
+ public static class EvaluatorResult {
+ private Map<Class<?>, ProcessorResult> processorResults = new HashMap<>();
+
+ public void addProcessorResult(Class<?> type, ProcessorResult result) {
+ this.processorResults.put(type, result);
+ }
+
+ public Map<Class<?>, ProcessorResult> getProcessorResults() {
+ return this.processorResults;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
new file mode 100644
index 0000000..4b18f7c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher.dedup;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface AlertDeduplicator {
+ boolean dedup(AnalyzerEntity analyzerJobEntity, Result result);
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
new file mode 100644
index 0000000..09f1af6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher.dedup.impl;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * dedup by jobDefId.
+ */
+public class SimpleDeduplicator implements AlertDeduplicator, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleDeduplicator.class);
+
+ private Map<String, Long> lastUpdateTime = new HashMap<>();
+
+ @Override
+ public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) {
+ long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL;
+ if (analyzerJobEntity.getJobMeta().containsKey(Constants.DEDUP_INTERVAL_KEY)) {
+ dedupInterval = (Long)analyzerJobEntity.getJobMeta().get(Constants.DEDUP_INTERVAL_KEY);
+ }
+
+ dedupInterval = dedupInterval * 1000;
+ long currentTimeStamp = System.currentTimeMillis();
+ if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) {
+ if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp);
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
new file mode 100644
index 0000000..dc09202
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.resource;
+
+import com.google.inject.Inject;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.apache.eagle.metadata.resource.RESTResponse;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.util.List;
+
+import static org.apache.eagle.jpm.analyzer.util.Constants.*;
+
+@Path(ANALYZER_PATH)
+public class AnalyzerResource {
+ @Inject
+ MetaManagementService metaManagementService;
+
+ public AnalyzerResource() {
+ }
+
+ @POST
+ @Path(META_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> addJobMeta(JobMetaEntity jobMetaEntity) {
+ return RESTResponse.<Void>async((response) -> {
+ jobMetaEntity.ensureDefault();
+ boolean ret = metaManagementService.addJobMeta(jobMetaEntity);
+ String message = "Successfully add job meta for " + jobMetaEntity.getJobDefId();
+ if (!ret) {
+ message = "Failed to add job meta for " + jobMetaEntity.getJobDefId();
+ }
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @POST
+ @Path(JOB_META_PATH)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> updateJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId, JobMetaEntity jobMetaEntity) {
+ return RESTResponse.<Void>async((response) -> {
+ jobMetaEntity.ensureDefault();
+ boolean ret = metaManagementService.updateJobMeta(jobDefId, jobMetaEntity);
+ String message = "Successfully update job meta for " + jobDefId;
+ if (!ret) {
+ message = "Failed to update job meta for " + jobDefId;
+ }
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @GET
+ @Path(JOB_META_PATH)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<List<JobMetaEntity>> getJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
+ return RESTResponse.async(() -> metaManagementService.getJobMeta(jobDefId)).get();
+ }
+
+ @DELETE
+ @Path(JOB_META_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> deleteJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
+ return RESTResponse.<Void>async((response) -> {
+ boolean ret = metaManagementService.deleteJobMeta(jobDefId);
+ String message = "Successfully delete job meta for " + jobDefId;
+ if (!ret) {
+ message = "Failed to delete job meta for " + jobDefId;
+ }
+
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @POST
+ @Path(PUBLISHER_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> addPublisherMeta(PublisherEntity publisherEntity) {
+ return RESTResponse.<Void>async((response) -> {
+ publisherEntity.ensureDefault();
+ boolean ret = metaManagementService.addPublisherMeta(publisherEntity);
+ String message = "Successfully add publisher meta for " + publisherEntity.getUserId();
+ if (!ret) {
+ message = "Failed to add publisher meta for " + publisherEntity.getUserId();
+ }
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @DELETE
+ @Path(PUBLISHER_META_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> deletePublisherMeta(@PathParam(USER_PATH) String userId) {
+ return RESTResponse.<Void>async((response) -> {
+ boolean ret = metaManagementService.deletePublisherMeta(userId);
+ String message = "Successfully delete publisher meta for " + userId;
+ if (!ret) {
+ message = "Failed to delete publisher meta for " + userId;
+ }
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @GET
+ @Path(PUBLISHER_META_PATH)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<List<PublisherEntity>> getPublisherMeta(@PathParam(USER_PATH) String userId) {
+ return RESTResponse.async(() -> metaManagementService.getPublisherMeta(userId)).get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
new file mode 100644
index 0000000..774e6d2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.analyzer.util;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Constants {
+ public static final String HOST_PATH = "service.host";
+ public static final String PORT_PATH = "service.port";
+ public static final String USERNAME_PATH = "service.username";
+ public static final String PASSWORD_PATH = "service.password";
+ public static final String CONTEXT_PATH = "service.context";
+ public static final String READ_TIMEOUT_PATH = "service.readTimeOutSeconds";
+
+ public static final String META_PATH = "/metadata";
+ public static final String ANALYZER_PATH = "/job/analyzer";
+ public static final String JOB_DEF_PATH = "jobDefId";
+ public static final String JOB_META_PATH = META_PATH + "/{" + JOB_DEF_PATH + "}";
+
+ public static final String PUBLISHER_PATH = "/publisher";
+ public static final String USER_PATH = "userId";
+ public static final String PUBLISHER_META_PATH = PUBLISHER_PATH + "/{" + USER_PATH + "}";
+
+ public static final String PROCESS_NONE = "PROCESS_NONE";
+
+ public static final String EVALUATOR_TIME_LENGTH_KEY = "evaluator.timeLength";
+ public static final int DEFAULT_EVALUATOR_TIME_LENGTH = 7;//7 days
+
+ public static final String ALERT_THRESHOLD_KEY = "alert.threshold";
+ public static final Map<Result.ResultLevel, Double> DEFAULT_ALERT_THRESHOLD = new HashMap<Result.ResultLevel, Double>() {
+ {
+ put(Result.ResultLevel.NOTICE, 0.1);
+ put(Result.ResultLevel.WARNING, 0.3);
+ put(Result.ResultLevel.CRITICAL, 0.5);
+ }
+ };
+
+ public static final String DEDUP_INTERVAL_KEY = "alert.dedupInterval"; //seconds
+ public static final int DEFAULT_DEDUP_INTERVAL = 300;
+
+ public static final String ANALYZER_REPORT_CONFIG_PATH = "application.analyzerReport";
+ public static final String ANALYZER_REPORT_SUBJECT = "Job Performance Alert For Job: %s";
+
+ public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic";
+ public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend";
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
new file mode 100644
index 0000000..66f7622
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.analyzer.util;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.metadata.resource.RESTResponse;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.net.URLEncoder;
+import java.util.*;
+
+public class Utils {
+ private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public static List<JobMetaEntity> getJobMeta(Config config, String jobDefId) {
+ List<JobMetaEntity> result = new ArrayList<>();
+ String url = "http://"
+ + config.getString(Constants.HOST_PATH)
+ + ":"
+ + config.getInt(Constants.PORT_PATH)
+ + config.getString(Constants.CONTEXT_PATH)
+ + Constants.ANALYZER_PATH
+ + Constants.META_PATH
+ + "/"
+ + URLEncoder.encode(jobDefId);
+
+ InputStream is = null;
+ try {
+ is = InputStreamUtils.getInputStream(url, null, org.apache.eagle.jpm.util.Constants.CompressionType.NONE);
+ LOG.info("get job meta from {}", url);
+ result = (List<JobMetaEntity>)OBJ_MAPPER.readValue(is, RESTResponse.class).getData();
+ } catch (Exception e) {
+ LOG.warn("failed to get job meta from {}", url, e);
+ } finally {
+ org.apache.eagle.jpm.util.Utils.closeInputStream(is);
+ return result;
+ }
+ }
+
+ public static <K, V extends Comparable<? super V>> List<Map.Entry<K, V>> sortByValue(Map<K, V> map) {
+ List<Map.Entry<K, V>> list = new LinkedList<>(map.entrySet());
+ Collections.sort(list, (e1, e2) -> e1.getValue().compareTo(e2.getValue()));
+ Collections.reverse(list);
+ return list;
+ }
+}