You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/01/12 02:23:30 UTC
[2/2] eagle git commit: [EAGLE-797] add job performance analysis
[EAGLE-797] add job performance analysis
for continuous development
Author: wujinhu <wu...@126.com>
Closes #762 from wujinhu/EAGLE-797.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/b4695801
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/b4695801
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/b4695801
Branch: refs/heads/master
Commit: b4695801f6c93ca0991fd9758c470b784aa8c781
Parents: 2adbbf5
Author: wujinhu <wu...@126.com>
Authored: Thu Jan 12 10:23:02 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Jan 12 10:23:02 2017 +0800
----------------------------------------------------------------------
.../app/spi/AbstractApplicationProvider.java | 3 +-
...adoopQueueRunningApplicationHealthCheck.java | 3 +-
.../jpm/aggregation/mr/MRMetricAggregator.java | 3 +
.../mr/MRMetricsAggregateContainer.java | 1 +
eagle-jpm/eagle-jpm-analyzer/pom.xml | 38 ++++++
.../eagle/jpm/analyzer/AnalyzerEntity.java | 130 ++++++++++++++++++
.../apache/eagle/jpm/analyzer/Evaluator.java | 24 ++++
.../apache/eagle/jpm/analyzer/JobAnalyzer.java | 28 ++++
.../apache/eagle/jpm/analyzer/Processor.java | 24 ++++
.../analyzer/meta/MetaManagementService.java | 39 ++++++
.../impl/MetaManagementServiceJDBCImpl.java | 77 +++++++++++
.../impl/MetaManagementServiceMemoryImpl.java | 127 ++++++++++++++++++
.../jpm/analyzer/meta/model/JobMetaEntity.java | 85 ++++++++++++
.../analyzer/meta/model/PublisherEntity.java | 77 +++++++++++
.../analyzer/mr/MRJobPerformanceAnalyzer.java | 65 +++++++++
.../jpm/analyzer/mr/sla/SLAJobEvaluator.java | 67 ++++++++++
.../sla/processors/LongStuckJobProcessor.java | 43 ++++++
.../UnExpectedLongDurationJobProcessor.java | 120 +++++++++++++++++
.../mr/suggestion/JobSuggestionEvaluator.java | 44 +++++++
.../analyzer/publisher/EagleStorePublisher.java | 40 ++++++
.../jpm/analyzer/publisher/EmailPublisher.java | 109 +++++++++++++++
.../eagle/jpm/analyzer/publisher/Publisher.java | 24 ++++
.../eagle/jpm/analyzer/publisher/Result.java | 109 +++++++++++++++
.../publisher/dedup/AlertDeduplicator.java | 25 ++++
.../dedup/impl/SimpleDeduplicator.java | 59 +++++++++
.../jpm/analyzer/resource/AnalyzerResource.java | 131 +++++++++++++++++++
.../eagle/jpm/analyzer/util/Constants.java | 65 +++++++++
.../apache/eagle/jpm/analyzer/util/Utils.java | 74 +++++++++++
.../main/resources/AnalyzerReportTemplate.vm | 131 +++++++++++++++++++
.../src/main/resources/createTable.sql | 23 ++++
.../MRHistoryJobApplicationHealthCheck.java | 1 -
...JobConfigurationCreationServiceListener.java | 1 -
.../JobEntityCreationEagleServiceListener.java | 3 +-
.../parser/TaskAttemptCounterListener.java | 3 +-
.../mr/history/parser/TaskFailureListener.java | 1 -
eagle-jpm/eagle-jpm-mr-running/pom.xml | 10 ++
.../jpm/mr/running/MRRunningJobApplication.java | 4 +-
.../MRRunningJobApplicationProvider.java | 11 ++
.../parser/MRJobEntityCreationHandler.java | 1 -
.../jpm/mr/running/parser/MRJobParser.java | 25 +++-
.../mr/running/storm/MRRunningJobParseBolt.java | 9 +-
.../mr/running/MRRunningJobApplicationTest.java | 4 +-
.../jpm/mr/running/parser/MRJobParserTest.java | 17 +--
.../SparkHistoryJobApplicationHealthCheck.java | 3 +-
.../history/crawl/JHFSparkEventReader.java | 2 +-
.../parser/SparkAppEntityCreationHandler.java | 2 +-
.../java/org/apache/eagle/jpm/util/Utils.java | 6 +-
eagle-jpm/pom.xml | 1 +
eagle-server-assembly/src/main/conf/eagle.conf | 6 +
.../TopologyCheckApplicationHealthCheck.java | 3 +-
50 files changed, 1868 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 2a8d7c0..e537643 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -19,6 +19,7 @@ package org.apache.eagle.app.spi;
import com.google.common.base.Preconditions;
import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.app.Application;
import org.apache.eagle.app.service.ApplicationListener;
@@ -130,7 +131,7 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
currentRegistry.register(scope, new AbstractModule() {
@Override
protected void configure() {
- bind(type).to(impl);
+ bind(type).to(impl).in(Singleton.class);
}
});
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
index 6471dfc..5a5d0ee 100644
--- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java
@@ -52,7 +52,7 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC
eagleServiceConfig.eagleService.username,
eagleServiceConfig.eagleService.password);
- client.getJerseyClient().setReadTimeout(60000);
+ client.setReadTimeout(60000);
String message = "";
try {
@@ -91,7 +91,6 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC
} catch (Exception e) {
return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e)));
} finally {
- client.getJerseyClient().destroy();
try {
client.close();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
index f8840b2..54bd29b 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java
@@ -74,6 +74,8 @@ public class MRMetricAggregator implements MetricAggregator, Serializable {
.endTime(endTime)
.pageSize(Integer.MAX_VALUE)
.send();
+
+ client.close();
} catch (Exception e) {
LOG.warn("{}", e);
return false;
@@ -151,6 +153,7 @@ public class MRMetricAggregator implements MetricAggregator, Serializable {
client.create(entities);
LOG.info("finish flushing entities of total number " + entities.size());
entities.clear();
+ client.close();
} catch (Exception e) {
LOG.warn("{}", e);
return false;
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
index 45bbcef..dd1980b 100644
--- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
+++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java
@@ -74,6 +74,7 @@ public class MRMetricsAggregateContainer implements MetricsAggregateContainer, S
List<Map<List<String>, List<Double>>> results = response.getObj();
long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue();
+ client.close();
return currentProcessTimeStamp;
} catch (Exception e) {
LOG.warn("{}", e);
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml
new file mode 100644
index 0000000..d6383b6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>eagle-jpm-parent</artifactId>
+ <groupId>org.apache.eagle</groupId>
+ <version>0.5.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>eagle-jpm-analyzer</artifactId>
+ <name>Eagle::App::JPM::Analyzer</name>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-metadata-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
new file mode 100644
index 0000000..f9b7af0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * will refactor later if other types of job needs this.
+ * AnalyzerEntity for each job needed to be analysised
+ */
+public class AnalyzerEntity {
+ private String jobDefId;
+ private String jobId;
+ private String siteId;
+ private String userId;
+
+ private long startTime;
+ private long endTime;
+ private long durationTime;
+ private String currentState;
+ private double progress;
+
+ private Map<String, Object> jobConfig = new HashMap<>();
+
+ private Map<String, Object> jobMeta = new HashMap<>();
+
+ public String getJobDefId() {
+ return jobDefId;
+ }
+
+ public void setJobDefId(String jobDefId) {
+ this.jobDefId = jobDefId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(long endTime) {
+ this.endTime = endTime;
+ }
+
+ public long getDurationTime() {
+ return durationTime;
+ }
+
+ public void setDurationTime(long durationTime) {
+ this.durationTime = durationTime;
+ }
+
+ public String getCurrentState() {
+ return currentState;
+ }
+
+ public void setCurrentState(String currentState) {
+ this.currentState = currentState;
+ }
+
+ public Map<String, Object> getJobConfig() {
+ return jobConfig;
+ }
+
+ public void setJobConfig(Map<String, Object> jobConfig) {
+ this.jobConfig = jobConfig;
+ }
+
+ public Map<String, Object> getJobMeta() {
+ return jobMeta;
+ }
+
+ public void setJobMeta(Map<String, Object> jobMeta) {
+ this.jobMeta = jobMeta;
+ }
+
+ public double getProgress() {
+ return progress;
+ }
+
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
new file mode 100644
index 0000000..6617916
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface Evaluator {
+ Result.EvaluatorResult evaluate(AnalyzerEntity analyzerEntity);
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
new file mode 100644
index 0000000..6cda1cd
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+/**
+ * Each JobAnalyzer contains one or more Evaluators to analysis each job.
+ * Each Evaluator is a group of Processors
+ * Each Processor implements an algorithm or a model to analysis one dimension of a job
+ *
+ */
+public interface JobAnalyzer {
+ void analysis(AnalyzerEntity analyzerEntity) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
new file mode 100644
index 0000000..d5a8a74
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface Processor {
+ Result.ProcessorResult process(AnalyzerEntity jobAnalysisEntity);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
new file mode 100644
index 0000000..0935266
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta;
+
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+
+import java.util.List;
+
+public interface MetaManagementService {
+ boolean addJobMeta(JobMetaEntity jobMetaEntity);
+
+ boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity);
+
+ List<JobMetaEntity> getJobMeta(String jobDefId);
+
+ boolean deleteJobMeta(String jobDefId);
+
+ boolean addPublisherMeta(PublisherEntity publisherEntity);
+
+ boolean deletePublisherMeta(String userId);
+
+ List<PublisherEntity> getPublisherMeta(String userId);
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
new file mode 100644
index 0000000..cfb5029
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class MetaManagementServiceJDBCImpl implements MetaManagementService, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceJDBCImpl.class);
+
+ @Inject
+ Config config;
+
+ @Override
+ public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
+
+ return true;
+ }
+
+ @Override
+ public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
+
+ return true;
+ }
+
+ @Override
+ public List<JobMetaEntity> getJobMeta(String jobDefId) {
+
+ return null;
+ }
+
+ @Override
+ public boolean deleteJobMeta(String jobDefId) {
+
+ return true;
+ }
+
+ @Override
+ public boolean addPublisherMeta(PublisherEntity publisherEntity) {
+
+ return true;
+ }
+
+ @Override
+ public boolean deletePublisherMeta(String userId) {
+
+ return true;
+ }
+
+ @Override
+ public List<PublisherEntity> getPublisherMeta(String userId) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
new file mode 100644
index 0000000..85e8358
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.impl;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.*;
+
+public class MetaManagementServiceMemoryImpl implements MetaManagementService, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceMemoryImpl.class);
+
+ private final Map<String, JobMetaEntity> jobMetaEntities = new HashMap<>();
+ private final Map<String, List<PublisherEntity>> publisherEntities = new HashMap<>();
+
+ @Inject
+ Config config;
+
+ @Override
+ public boolean addJobMeta(JobMetaEntity jobMetaEntity) {
+ if (jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
+ LOG.warn("contains job {} already, add job meta failed", jobMetaEntity.getJobDefId());
+ return false;
+ }
+
+ jobMetaEntities.put(jobMetaEntity.getJobDefId(), jobMetaEntity);
+ LOG.info("Successfully add job {} meta", jobMetaEntity.getJobDefId());
+ return true;
+ }
+
+ @Override
+ public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) {
+ if (!jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) {
+ LOG.warn("does not contain job {}, update job meta failed", jobDefId);
+ return false;
+ }
+
+ jobMetaEntities.put(jobDefId, jobMetaEntity);
+ LOG.info("Successfully update job {} meta", jobDefId);
+ return true;
+ }
+
+ @Override
+ public List<JobMetaEntity> getJobMeta(String jobDefId) {
+ if (!jobMetaEntities.containsKey(jobDefId)) {
+ LOG.warn("does not contain job {}, get job meta failed", jobDefId);
+ return new ArrayList<>();
+ }
+
+ return Arrays.asList(jobMetaEntities.get(jobDefId));
+ }
+
+ @Override
+ public boolean deleteJobMeta(String jobDefId) {
+ if (!jobMetaEntities.containsKey(jobDefId)) {
+ LOG.warn("does not contain job {}, delete job meta failed", jobDefId);
+ return false;
+ }
+
+ jobMetaEntities.remove(jobDefId);
+ LOG.info("Successfully delete job {} meta", jobDefId);
+ return true;
+ }
+
+ @Override
+ public boolean addPublisherMeta(PublisherEntity publisherEntity) {
+ if (publisherEntities.containsKey(publisherEntity.getUserId())) {
+ for (PublisherEntity entity : publisherEntities.get(publisherEntity.getUserId())) {
+ if (entity.equals(publisherEntity)) {
+ LOG.warn("contains user {}, mailAddress {} already, add publisher failed", entity.getUserId(), entity.getMailAddress());
+ return false;
+ }
+ }
+ }
+
+ if (!publisherEntities.containsKey(publisherEntity.getUserId())) {
+ publisherEntities.put(publisherEntity.getUserId(), new ArrayList<>());
+ }
+
+ publisherEntities.get(publisherEntity.getUserId()).add(publisherEntity);
+ LOG.info("Successfully add publisher user {}, mailAddress {}", publisherEntity.getUserId(), publisherEntity.getMailAddress());
+ return true;
+ }
+
+ @Override
+ public boolean deletePublisherMeta(String userId) {
+ if (!publisherEntities.containsKey(userId)) {
+ LOG.warn("does not contain user {}, failed to delete publisher", userId);
+ return false;
+ }
+
+ publisherEntities.remove(userId);
+ LOG.info("Successfully delete publisher user " + userId);
+ return true;
+ }
+
+ @Override
+ public List<PublisherEntity> getPublisherMeta(String userId) {
+ if (!publisherEntities.containsKey(userId)) {
+ LOG.warn("does not contain user {}, failed to get publisher", userId);
+ return new ArrayList<>();
+ }
+
+ return publisherEntities.get(userId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
new file mode 100644
index 0000000..2e15c17
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class JobMetaEntity extends PersistenceEntity {
+ private String jobDefId;
+ private String siteId;
+ private Map<String, Object> configuration = new HashMap<>();
+ private Set<String> evaluators = new HashSet<>();
+
+ public JobMetaEntity() {
+
+ }
+
+ public JobMetaEntity(String jobDefId,
+ String siteId,
+ Map<String, Object> configuration,
+ Set<String> evaluators) {
+ this.jobDefId = jobDefId;
+ this.siteId = siteId;
+ this.configuration = configuration;
+ this.evaluators = evaluators;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("JobMetaEntity[jobDefId=%s, siteId=%s]", jobDefId, siteId);
+ }
+
+ public String getJobDefId() {
+ return jobDefId;
+ }
+
+ public void setJobDefId(String jobDefId) {
+ this.jobDefId = jobDefId;
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+
+ public Map<String, Object> getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(Map<String, Object> configuration) {
+ this.configuration = configuration;
+ }
+
+ public Set<String> getEvaluators() {
+ return evaluators;
+ }
+
+ public void setEvaluators(Set<String> evaluators) {
+ this.evaluators = evaluators;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
new file mode 100644
index 0000000..bca7ab1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.meta.model;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.metadata.persistence.PersistenceEntity;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PublisherEntity extends PersistenceEntity {
+ private String userId;
+ private String mailAddress;
+
+ public PublisherEntity(String userId, String mailAddress) {
+ this.userId = userId;
+ this.mailAddress = mailAddress;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("PublisherEntity[userId=%s, mailAddress=%s]", userId, mailAddress);
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public String getMailAddress() {
+ return mailAddress;
+ }
+
+ public void setMailAddress(String mailAddress) {
+ this.mailAddress = mailAddress;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(userId)
+ .append(mailAddress)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == this) {
+ return true;
+ }
+
+ if (!(that instanceof PublisherEntity)) {
+ return false;
+ }
+
+ PublisherEntity another = (PublisherEntity)that;
+
+ return another.userId.equals(this.userId) && another.mailAddress.equals(this.mailAddress);
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
new file mode 100644
index 0000000..e0e579a
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.*;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
+import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
+import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher;
+import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher;
+import org.apache.eagle.jpm.analyzer.publisher.Publisher;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(MRJobPerformanceAnalyzer.class);
+
+ private List<Evaluator> evaluators = new ArrayList<>();
+ private List<Publisher> publishers = new ArrayList<>();
+
+ private Config config;
+
+ public MRJobPerformanceAnalyzer(Config config) {
+ this.config = config;
+ evaluators.add(new SLAJobEvaluator(config));
+ evaluators.add(new JobSuggestionEvaluator(config));
+
+ publishers.add(new EagleStorePublisher(config));
+ publishers.add(new EmailPublisher(config));
+ }
+
+ @Override
+ public void analysis(AnalyzerEntity analyzerJobEntity) throws Exception {
+ Result result = new Result();
+
+ for (Evaluator evaluator : evaluators) {
+ result.addEvaluatorResult(evaluator.getClass(), evaluator.evaluate(analyzerJobEntity));
+ }
+
+ for (Publisher publisher : publishers) {
+ publisher.publish(analyzerJobEntity, result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
new file mode 100644
index 0000000..f10b68d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.mr.sla.processors.LongStuckJobProcessor;
+import org.apache.eagle.jpm.analyzer.mr.sla.processors.UnExpectedLongDurationJobProcessor;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class SLAJobEvaluator implements Evaluator, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(SLAJobEvaluator.class);
+
+ private List<Processor> processors = new ArrayList<>();
+ private Config config;
+
+ public SLAJobEvaluator(Config config) {
+ this.config = config;
+ processors.add(new UnExpectedLongDurationJobProcessor(config));
+ processors.add(new LongStuckJobProcessor(config));
+ }
+
+ @Override
+ public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) {
+ Result.EvaluatorResult result = new Result.EvaluatorResult();
+
+ List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId());
+ if (jobMetaEntities.size() == 0
+ || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getName())) {
+ LOG.info("SLAJobEvaluator skip job {}", analyzerJobEntity.getJobDefId());
+ return result;
+ }
+
+ analyzerJobEntity.setJobMeta(jobMetaEntities.get(0).getConfiguration());
+
+ for (Processor processor : processors) {
+ result.addProcessorResult(processor.getClass(), processor.process(analyzerJobEntity));
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
new file mode 100644
index 0000000..35f3b27
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla.processors;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class LongStuckJobProcessor implements Processor, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(LongStuckJobProcessor.class);
+
+ private Config config;
+
+ public LongStuckJobProcessor(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
+ LOG.info("Job {} In LongStuckJobProcessor", analyzerJobEntity.getJobDefId());
+ return new Result.ProcessorResult(Result.ResultLevel.NONE, "");
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
new file mode 100644
index 0000000..9d4ce2b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.sla.processors;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.Processor;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.apache.eagle.jpm.analyzer.util.Utils;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.util.List;
+import java.util.Map;
+
+public class UnExpectedLongDurationJobProcessor implements Processor, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(UnExpectedLongDurationJobProcessor.class);
+
+ private Config config;
+
+ public UnExpectedLongDurationJobProcessor(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) {
+ LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId());
+
+ Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta();
+ long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData);
+ if (avgDurationTime == 0L) {
+ return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+ }
+
+ Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD;
+ if (jobMetaData.containsKey(Constants.ALERT_THRESHOLD_KEY)) {
+ alertThreshold = (Map<Result.ResultLevel, Double>)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY);
+ }
+ List<Map.Entry<Result.ResultLevel, Double>> sorted = Utils.sortByValue(alertThreshold);
+
+ double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime;
+ for (Map.Entry<Result.ResultLevel, Double> entry : sorted) {
+ if (expirePercent >= entry.getValue()) {
+ return new Result.ProcessorResult(entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds",
+ (int)(expirePercent * 100), avgDurationTime / 1000));
+ }
+ }
+
+ return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE);
+ }
+
+ private long getAvgDuration(AnalyzerEntity mrJobAnalysisEntity, Map<String, Object> jobMetaData) {
+ IEagleServiceClient client = new EagleServiceClientImpl(
+ config.getString(Constants.HOST_PATH),
+ config.getInt(Constants.PORT_PATH),
+ config.getString(Constants.USERNAME_PATH),
+ config.getString(Constants.PASSWORD_PATH));
+
+ client.setReadTimeout(config.getInt(Constants.READ_TIMEOUT_PATH) * 1000);
+
+ try {
+ int timeLength = Constants.DEFAULT_EVALUATOR_TIME_LENGTH;
+ try {
+ if (jobMetaData.containsKey(Constants.EVALUATOR_TIME_LENGTH_KEY)) {
+ timeLength = Integer.parseInt(jobMetaData.get(Constants.EVALUATOR_TIME_LENGTH_KEY).toString());
+ }
+ } catch (Exception e) {
+ LOG.warn("exception found when parse timeLength {}, use default", e);
+ }
+
+ String query = String.format("%s[@site=\"%s\" and @jobDefId=\"%s\"]<@site>{avg(durationTime)}",
+ org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME,
+ mrJobAnalysisEntity.getSiteId(),
+ URLEncoder.encode(mrJobAnalysisEntity.getJobDefId()));
+
+ GenericServiceAPIResponseEntity response = client
+ .search(query)
+ .startTime(System.currentTimeMillis() - (timeLength + 1) * 24 * 60 * 60000L)
+ .endTime(System.currentTimeMillis() - 24 * 60 * 60000L)
+ .pageSize(10)
+ .send();
+
+ List<Map<List<String>, List<Double>>> results = response.getObj();
+ if (results.size() == 0) {
+ return 0L;
+ }
+ return results.get(0).get("value").get(0).longValue();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ return 0L;
+ } finally {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("{}", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
new file mode 100644
index 0000000..79f5318
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.mr.suggestion;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.Evaluator;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class JobSuggestionEvaluator implements Evaluator, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionEvaluator.class);
+
+ private Config config;
+
+ public JobSuggestionEvaluator(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public Result.EvaluatorResult evaluate(AnalyzerEntity mrJobEntity) {
+ Result.EvaluatorResult result = new Result.EvaluatorResult();
+ //TODO
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
new file mode 100644
index 0000000..6109704
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+public class EagleStorePublisher implements Publisher, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(EagleStorePublisher.class);
+
+ private Config config;
+
+ public EagleStorePublisher(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
new file mode 100644
index 0000000..4e49094
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.app.service.ApplicationEmailService;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.mail.AlertEmailConstants;
+import org.apache.eagle.common.mail.AlertEmailContext;
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class EmailPublisher implements Publisher, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class);
+
+ private Config config;
+ private AlertDeduplicator alertDeduplicator;
+
+ public EmailPublisher(Config config) {
+ this.config = config;
+ this.alertDeduplicator = new SimpleDeduplicator();
+ }
+
+ @Override
+ public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
+ if (result.getAlertMessages().size() == 0) {
+ return;
+ }
+
+ LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId());
+ if (alertDeduplicator.dedup(analyzerJobEntity, result)) {
+ LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId());
+ return;
+ }
+
+ Map<String, String> basic = new HashMap<>();
+ basic.put("site", analyzerJobEntity.getSiteId());
+ basic.put("name", analyzerJobEntity.getJobDefId());
+ basic.put("user", analyzerJobEntity.getUserId());
+ basic.put("status", analyzerJobEntity.getCurrentState());
+ basic.put("duration", analyzerJobEntity.getDurationTime() * 1.0 / 1000 + "s");
+ basic.put("start", DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getStartTime()));
+ basic.put("end", analyzerJobEntity.getEndTime() == 0
+ ? "0"
+ : DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getEndTime()));
+ basic.put("progress", analyzerJobEntity.getProgress() + "%");
+ basic.put("detail", getJobLink(analyzerJobEntity));
+
+
+ Map<String, Map<String, String>> extend = new HashMap<>();
+ Map<String, List<Pair<Result.ResultLevel, String>>> alertMessages = result.getAlertMessages();
+ for (String evaluator : alertMessages.keySet()) {
+ List<Pair<Result.ResultLevel, String>> messages = alertMessages.get(evaluator);
+ extend.put(evaluator, new HashMap<>());
+ for (Pair<Result.ResultLevel, String> message : messages) {
+ LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]",
+ analyzerJobEntity.getJobDefId(), message.getRight(), message.getLeft(), evaluator);
+ extend.get(evaluator).put(message.getRight(), message.getLeft().toString());
+ }
+ }
+
+ Map<String, Object> alertData = new HashMap<>();
+ alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic);
+ alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend);
+
+ //TODO, override email config in job meta data
+ ApplicationEmailService emailService = new ApplicationEmailService(config, Constants.ANALYZER_REPORT_CONFIG_PATH);
+ String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId());
+ AlertEmailContext alertContext = emailService.buildEmailContext(subject);
+ emailService.onAlert(alertContext, alertData);
+ }
+
+ private String getJobLink(AnalyzerEntity analyzerJobEntity) {
+ return "http://"
+ + config.getString(Constants.HOST_PATH)
+ + ":"
+ + config.getInt(Constants.PORT_PATH)
+ + "/#/site/"
+ + analyzerJobEntity.getSiteId()
+ + "/jpm/detail/"
+ + analyzerJobEntity.getJobId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
new file mode 100644
index 0000000..2f42bf9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+
+public interface Publisher {
+ void publish(AnalyzerEntity analyzerJobEntity, Result result);
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
new file mode 100644
index 0000000..a12f589
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Result {
+ //for EagleStorePublisher
+ private TaggedLogAPIEntity alertEntity = null;//TODO
+ //for EmailPublisher
+ private Map<String, List<Pair<ResultLevel, String>>> alertMessages = new HashMap<>();
+
+ public void addEvaluatorResult(Class<?> type, EvaluatorResult result) {
+ Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults();
+ for (Class<?> processorType : processorResults.keySet()) {
+ ProcessorResult processorResult = processorResults.get(processorType);
+ if (processorResult.resultLevel.equals(ResultLevel.NONE)) {
+ continue;
+ }
+
+ String typeName = type.getName();
+ if (!alertMessages.containsKey(typeName)) {
+ alertMessages.put(typeName, new ArrayList<>());
+ }
+ alertMessages.get(typeName).add(Pair.of(processorResult.getResultLevel(), processorResult.getMessage()));
+ }
+ }
+
+ public TaggedLogAPIEntity getAlertEntity() {
+ return alertEntity;
+ }
+
+ public Map<String, List<Pair<ResultLevel, String>>> getAlertMessages() {
+ return alertMessages;
+ }
+
+ /**
+ * Processor result.
+ */
+
+ public enum ResultLevel {
+ NONE,
+ NOTICE,
+ WARNING,
+ CRITICAL
+ }
+
+ public static class ProcessorResult {
+ private ResultLevel resultLevel;
+ private String message;
+
+ public ProcessorResult(ResultLevel resultLevel, String message) {
+ this.resultLevel = resultLevel;
+ this.message = message;
+ }
+
+ public ResultLevel getResultLevel() {
+ return resultLevel;
+ }
+
+ public void setResultLevel(ResultLevel resultLevel) {
+ this.resultLevel = resultLevel;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+ }
+
+ /**
+ * Evaluator result.
+ */
+ public static class EvaluatorResult {
+ private Map<Class<?>, ProcessorResult> processorResults = new HashMap<>();
+
+ public void addProcessorResult(Class<?> type, ProcessorResult result) {
+ this.processorResults.put(type, result);
+ }
+
+ public Map<Class<?>, ProcessorResult> getProcessorResults() {
+ return this.processorResults;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
new file mode 100644
index 0000000..4b18f7c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher.dedup;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+public interface AlertDeduplicator {
+ boolean dedup(AnalyzerEntity analyzerJobEntity, Result result);
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
new file mode 100644
index 0000000..09f1af6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.publisher.dedup.impl;
+
+import org.apache.eagle.jpm.analyzer.AnalyzerEntity;
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator;
+import org.apache.eagle.jpm.analyzer.util.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * dedup by jobDefId.
+ */
+public class SimpleDeduplicator implements AlertDeduplicator, Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleDeduplicator.class);
+
+ private Map<String, Long> lastUpdateTime = new HashMap<>();
+
+ @Override
+ public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) {
+ long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL;
+ if (analyzerJobEntity.getJobMeta().containsKey(Constants.DEDUP_INTERVAL_KEY)) {
+ dedupInterval = (Long)analyzerJobEntity.getJobMeta().get(Constants.DEDUP_INTERVAL_KEY);
+ }
+
+ dedupInterval = dedupInterval * 1000;
+ long currentTimeStamp = System.currentTimeMillis();
+ if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) {
+ if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp);
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
new file mode 100644
index 0000000..dc09202
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.analyzer.resource;
+
+import com.google.inject.Inject;
+import org.apache.eagle.jpm.analyzer.meta.MetaManagementService;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity;
+import org.apache.eagle.metadata.resource.RESTResponse;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.util.List;
+
+import static org.apache.eagle.jpm.analyzer.util.Constants.*;
+
+@Path(ANALYZER_PATH)
+public class AnalyzerResource {
+ @Inject
+ MetaManagementService metaManagementService;
+
+ public AnalyzerResource() {
+ }
+
+ @POST
+ @Path(META_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> addJobMeta(JobMetaEntity jobMetaEntity) {
+ return RESTResponse.<Void>async((response) -> {
+ jobMetaEntity.ensureDefault();
+ boolean ret = metaManagementService.addJobMeta(jobMetaEntity);
+ String message = "Successfully add job meta for " + jobMetaEntity.getJobDefId();
+ if (!ret) {
+ message = "Failed to add job meta for " + jobMetaEntity.getJobDefId();
+ }
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @POST
+ @Path(JOB_META_PATH)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> updateJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId, JobMetaEntity jobMetaEntity) {
+ return RESTResponse.<Void>async((response) -> {
+ jobMetaEntity.ensureDefault();
+ boolean ret = metaManagementService.updateJobMeta(jobDefId, jobMetaEntity);
+ String message = "Successfully update job meta for " + jobDefId;
+ if (!ret) {
+ message = "Failed to update job meta for " + jobDefId;
+ }
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @GET
+ @Path(JOB_META_PATH)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<List<JobMetaEntity>> getJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
+ return RESTResponse.async(() -> metaManagementService.getJobMeta(jobDefId)).get();
+ }
+
+ @DELETE
+ @Path(JOB_META_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> deleteJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) {
+ return RESTResponse.<Void>async((response) -> {
+ boolean ret = metaManagementService.deleteJobMeta(jobDefId);
+ String message = "Successfully delete job meta for " + jobDefId;
+ if (!ret) {
+ message = "Failed to delete job meta for " + jobDefId;
+ }
+
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @POST
+ @Path(PUBLISHER_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> addPublisherMeta(PublisherEntity publisherEntity) {
+ return RESTResponse.<Void>async((response) -> {
+ publisherEntity.ensureDefault();
+ boolean ret = metaManagementService.addPublisherMeta(publisherEntity);
+ String message = "Successfully add publisher meta for " + publisherEntity.getUserId();
+ if (!ret) {
+ message = "Failed to add publisher meta for " + publisherEntity.getUserId();
+ }
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @DELETE
+ @Path(PUBLISHER_META_PATH)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<Void> deletePublisherMeta(@PathParam(USER_PATH) String userId) {
+ return RESTResponse.<Void>async((response) -> {
+ boolean ret = metaManagementService.deletePublisherMeta(userId);
+ String message = "Successfully delete publisher meta for " + userId;
+ if (!ret) {
+ message = "Failed to delete publisher meta for " + userId;
+ }
+ response.success(ret).message(message);
+ }).get();
+ }
+
+ @GET
+ @Path(PUBLISHER_META_PATH)
+ @Produces(MediaType.APPLICATION_JSON)
+ public RESTResponse<List<PublisherEntity>> getPublisherMeta(@PathParam(USER_PATH) String userId) {
+ return RESTResponse.async(() -> metaManagementService.getPublisherMeta(userId)).get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
new file mode 100644
index 0000000..774e6d2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.analyzer.util;
+
+import org.apache.eagle.jpm.analyzer.publisher.Result;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Constants {
+ public static final String HOST_PATH = "service.host";
+ public static final String PORT_PATH = "service.port";
+ public static final String USERNAME_PATH = "service.username";
+ public static final String PASSWORD_PATH = "service.password";
+ public static final String CONTEXT_PATH = "service.context";
+ public static final String READ_TIMEOUT_PATH = "service.readTimeOutSeconds";
+
+ public static final String META_PATH = "/metadata";
+ public static final String ANALYZER_PATH = "/job/analyzer";
+ public static final String JOB_DEF_PATH = "jobDefId";
+ public static final String JOB_META_PATH = META_PATH + "/{" + JOB_DEF_PATH + "}";
+
+ public static final String PUBLISHER_PATH = "/publisher";
+ public static final String USER_PATH = "userId";
+ public static final String PUBLISHER_META_PATH = PUBLISHER_PATH + "/{" + USER_PATH + "}";
+
+ public static final String PROCESS_NONE = "PROCESS_NONE";
+
+ public static final String EVALUATOR_TIME_LENGTH_KEY = "evaluator.timeLength";
+ public static final int DEFAULT_EVALUATOR_TIME_LENGTH = 7;//7 days
+
+ public static final String ALERT_THRESHOLD_KEY = "alert.threshold";
+ public static final Map<Result.ResultLevel, Double> DEFAULT_ALERT_THRESHOLD = new HashMap<Result.ResultLevel, Double>() {
+ {
+ put(Result.ResultLevel.NOTICE, 0.1);
+ put(Result.ResultLevel.WARNING, 0.3);
+ put(Result.ResultLevel.CRITICAL, 0.5);
+ }
+ };
+
+ public static final String DEDUP_INTERVAL_KEY = "alert.dedupInterval"; //seconds
+ public static final int DEFAULT_DEDUP_INTERVAL = 300;
+
+ public static final String ANALYZER_REPORT_CONFIG_PATH = "application.analyzerReport";
+ public static final String ANALYZER_REPORT_SUBJECT = "Job Performance Alert For Job: %s";
+
+ public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic";
+ public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend";
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
new file mode 100644
index 0000000..66f7622
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.analyzer.util;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity;
+import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils;
+import org.apache.eagle.metadata.resource.RESTResponse;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.net.URLEncoder;
+import java.util.*;
+
+public class Utils {
+ private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public static List<JobMetaEntity> getJobMeta(Config config, String jobDefId) {
+ List<JobMetaEntity> result = new ArrayList<>();
+ String url = "http://"
+ + config.getString(Constants.HOST_PATH)
+ + ":"
+ + config.getInt(Constants.PORT_PATH)
+ + config.getString(Constants.CONTEXT_PATH)
+ + Constants.ANALYZER_PATH
+ + Constants.META_PATH
+ + "/"
+ + URLEncoder.encode(jobDefId);
+
+ InputStream is = null;
+ try {
+ is = InputStreamUtils.getInputStream(url, null, org.apache.eagle.jpm.util.Constants.CompressionType.NONE);
+ LOG.info("get job meta from {}", url);
+ result = (List<JobMetaEntity>)OBJ_MAPPER.readValue(is, RESTResponse.class).getData();
+ } catch (Exception e) {
+ LOG.warn("failed to get job meta from {}", url, e);
+ } finally {
+ org.apache.eagle.jpm.util.Utils.closeInputStream(is);
+ return result;
+ }
+ }
+
+ public static <K, V extends Comparable<? super V>> List<Map.Entry<K, V>> sortByValue(Map<K, V> map) {
+ List<Map.Entry<K, V>> list = new LinkedList<>(map.entrySet());
+ Collections.sort(list, (e1, e2) -> e1.getValue().compareTo(e2.getValue()));
+ Collections.reverse(list);
+ return list;
+ }
+}