You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/04/01 19:14:09 UTC
[ambari] 13/39: AMBARI-21686 : Implement a test driver that
provides a set of metric series with different kinds of metric behavior.
(avijayan)
This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
commit e466cc4fd06a30c9fd62eb99923576664233f06e
Author: Aravindan Vijayan <av...@hortonworks.com>
AuthorDate: Thu Sep 21 15:40:35 2017 -0700
AMBARI-21686 : Implement a test driver that provides a set of metric series with different kinds of metric behavior. (avijayan)
---
ambari-metrics/ambari-metrics-alertservice/pom.xml | 17 +-
.../ambari/metrics/alertservice/R/AmsRTest.java | 147 --------
.../metrics/alertservice/R/RFunctionInvoker.java | 192 -----------
.../metrics/alertservice/common/MetricAnomaly.java | 69 ----
.../common/SingleValuedTimelineMetric.java | 103 ------
.../alertservice/common/TimelineMetric.java | 238 -------------
.../alertservice/common/TimelineMetrics.java | 129 -------
.../metrics/alertservice/methods/ema/EmaDS.java | 70 ----
.../metrics/alertservice/methods/ema/EmaModel.java | 129 -------
.../alertservice/methods/ema/TestEmaModel.java | 68 ----
.../prototype/AmbariServerInterface.java | 122 +++++++
.../prototype/MetricAnomalyDetectorTestInput.java | 126 +++++++
.../prototype/MetricAnomalyTester.java | 163 +++++++++
.../MetricKafkaProducer.java} | 54 +--
.../prototype/MetricSparkConsumer.java | 178 ++++++++++
.../prototype/MetricsCollectorInterface.java | 237 +++++++++++++
.../prototype/PointInTimeADSystem.java | 256 ++++++++++++++
.../alertservice/prototype/RFunctionInvoker.java | 222 ++++++++++++
.../prototype/TestSeriesInputRequest.java | 88 +++++
.../alertservice/prototype/TrendADSystem.java | 331 ++++++++++++++++++
.../TrendMetric.java} | 18 +-
.../common/DataSeries.java} | 12 +-
.../{ => prototype}/common/ResultSet.java | 4 +-
.../{ => prototype}/common/StatisticUtils.java | 41 +--
.../methods/AnomalyDetectionTechnique.java} | 26 +-
.../prototype/methods/MetricAnomaly.java | 86 +++++
.../prototype/methods/ema/EmaModel.java | 124 +++++++
.../methods/ema/EmaModelLoader.java | 28 +-
.../prototype/methods/ema/EmaTechnique.java | 142 ++++++++
.../prototype/methods/hsdev/HsdevTechnique.java | 77 +++++
.../prototype/methods/kstest/KSTechnique.java | 101 ++++++
.../AbstractMetricSeries.java} | 12 +-
.../seriesgenerator/DualBandMetricSeries.java | 88 +++++
.../MetricSeriesGeneratorFactory.java | 379 +++++++++++++++++++++
.../seriesgenerator/MonotonicMetricSeries.java | 101 ++++++
.../seriesgenerator/NormalMetricSeries.java | 81 +++++
.../SteadyWithTurbulenceMetricSeries.java | 115 +++++++
.../seriesgenerator/StepFunctionMetricSeries.java | 107 ++++++
.../seriesgenerator/UniformMetricSeries.java | 95 ++++++
.../alertservice/spark/AnomalyMetricPublisher.java | 196 -----------
.../alertservice/spark/MetricAnomalyDetector.java | 147 --------
.../src/main/resources/R-scripts/hsdev.r | 12 +-
.../src/main/resources/R-scripts/kstest.r | 2 +-
.../src/main/resources/R-scripts/tukeys.r | 9 +-
.../src/main/resources/R-scripts/util.R | 36 --
.../alertservice/prototype/TestEmaTechnique.java | 86 +++++
.../prototype/TestRFunctionInvoker.java | 161 +++++++++
.../metrics/alertservice/prototype/TestTukeys.java | 100 ++++++
.../seriesgenerator/MetricSeriesGeneratorTest.java | 108 ++++++
.../metrics2/sink/timeline/TimelineMetric.java | 5 +-
.../metrics2/sink/timeline/TimelineMetrics.java | 3 +-
.../metrics/spark/MetricAnomalyDetector.scala | 18 +-
.../ambari/metrics/spark/SparkPhoenixReader.scala | 18 +-
.../ambari-metrics-timelineservice/pom.xml | 2 +-
.../timeline/HBaseTimelineMetricsService.java | 24 --
.../metrics/timeline/MetricsPaddingMethodTest.java | 7 -
56 files changed, 3794 insertions(+), 1716 deletions(-)
diff --git a/ambari-metrics/ambari-metrics-alertservice/pom.xml b/ambari-metrics/ambari-metrics-alertservice/pom.xml
index 4afc80f..4db8a6a 100644
--- a/ambari-metrics/ambari-metrics-alertservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-alertservice/pom.xml
@@ -31,7 +31,6 @@
<build>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
@@ -130,5 +129,21 @@
<artifactId>spark-mllib_2.10</artifactId>
<version>1.3.0</version>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>4.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.2.5</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java
deleted file mode 100644
index 2bbc250..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.R;
-
-import org.apache.ambari.metrics.alertservice.common.ResultSet;
-import org.apache.ambari.metrics.alertservice.common.DataSet;
-import org.apache.commons.lang.ArrayUtils;
-import org.rosuda.JRI.REXP;
-import org.rosuda.JRI.RVector;
-import org.rosuda.JRI.Rengine;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-public class AmsRTest {
-
- public static void main(String[] args) {
-
- String metricName = "TestMetric";
- double[] ts = getTS(1000);
-
- double[] train_ts = ArrayUtils.subarray(ts, 0,750);
- double[] train_x = getData(750);
- DataSet trainData = new DataSet(metricName, train_ts, train_x);
-
- double[] test_ts = ArrayUtils.subarray(ts, 750,1000);
- double[] test_x = getData(250);
- test_x[50] = 5.5; //Anomaly
- DataSet testData = new DataSet(metricName, test_ts, test_x);
- ResultSet rs;
-
- Map<String, String> configs = new HashMap();
-
- System.out.println("TUKEYS");
- configs.put("tukeys.n", "3");
- rs = RFunctionInvoker.tukeys(trainData, testData, configs);
- rs.print();
- System.out.println("--------------");
-
- System.out.println("EMA Global");
- configs.put("ema.n", "3");
- configs.put("ema.w", "0.8");
- rs = RFunctionInvoker.ema_global(trainData, testData, configs);
- rs.print();
- System.out.println("--------------");
-
- System.out.println("EMA Daily");
- rs = RFunctionInvoker.ema_daily(trainData, testData, configs);
- rs.print();
- System.out.println("--------------");
-
- configs.put("ks.p_value", "0.05");
- System.out.println("KS Test");
- rs = RFunctionInvoker.ksTest(trainData, testData, configs);
- rs.print();
- System.out.println("--------------");
-
- ts = getTS(5000);
- train_ts = ArrayUtils.subarray(ts, 30,4800);
- train_x = getData(4800);
- trainData = new DataSet(metricName, train_ts, train_x);
- test_ts = ArrayUtils.subarray(ts, 4800,5000);
- test_x = getData(200);
- for (int i =0; i<200;i++) {
- test_x[i] = test_x[i]*5;
- }
- testData = new DataSet(metricName, test_ts, test_x);
- configs.put("hsdev.n", "3");
- configs.put("hsdev.nhp", "3");
- configs.put("hsdev.interval", "86400000");
- configs.put("hsdev.period", "604800000");
- System.out.println("HSdev");
- rs = RFunctionInvoker.hsdev(trainData, testData, configs);
- rs.print();
- System.out.println("--------------");
-
- }
-
- static double[] getTS(int n) {
- long currentTime = System.currentTimeMillis();
- double[] ts = new double[n];
- currentTime = currentTime - (currentTime % (5*60*1000));
-
- for (int i = 0,j=n-1; i<n; i++,j--) {
- ts[j] = currentTime;
- currentTime = currentTime - (5*60*1000);
- }
- return ts;
- }
-
- static void testBasic() {
- Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
- try {
- r.eval("library(ambarimetricsAD)");
- r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/test.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
- r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/util.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
- r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
- double[] ts = getTS(5000);
- double[] x = getData(5000);
- r.assign("ts", ts);
- r.assign("x", x);
- r.eval("x[1000] <- 4.5");
- r.eval("x[2000] <- 4.75");
- r.eval("x[3000] <- 3.5");
- r.eval("x[4000] <- 5.5");
- r.eval("x[5000] <- 5.0");
- r.eval("data <- data.frame(ts,x)");
- r.eval("names(data) <- c(\"TS\", \"Metric\")");
- System.out.println(r.eval("data"));
- REXP exp = r.eval("t_an <- test_methods(data)");
- exp = r.eval("t_an");
- String strExp = exp.asString();
- System.out.println("result:" + exp);
- RVector cont = (RVector) exp.getContent();
- double[] an_ts = cont.at(0).asDoubleArray();
- double[] an_x = cont.at(1).asDoubleArray();
- System.out.println("result:" + strExp);
- }
- finally {
- r.end();
- }
- }
- static double[] getData(int n) {
- double[] metrics = new double[n];
- Random random = new Random();
- for (int i = 0; i<n; i++) {
- metrics[i] = random.nextDouble();
- }
- return metrics;
- }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
deleted file mode 100644
index 2713b71..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.R;
-
-
-import org.apache.ambari.metrics.alertservice.common.ResultSet;
-import org.apache.ambari.metrics.alertservice.common.DataSet;
-import org.rosuda.JRI.REXP;
-import org.rosuda.JRI.RVector;
-import org.rosuda.JRI.Rengine;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class RFunctionInvoker {
-
- public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
-
-
- private static void loadDataSets(Rengine r, DataSet trainData, DataSet testData) {
- r.assign("train_ts", trainData.ts);
- r.assign("train_x", trainData.values);
- r.eval("train_data <- data.frame(train_ts,train_x)");
- r.eval("names(train_data) <- c(\"TS\", " + trainData.metricName + ")");
-
- r.assign("test_ts", testData.ts);
- r.assign("test_x", testData.values);
- r.eval("test_data <- data.frame(test_ts,test_x)");
- r.eval("names(test_data) <- c(\"TS\", " + testData.metricName + ")");
- }
-
-
- public static ResultSet tukeys(DataSet trainData, DataSet testData, Map<String, String> configs) {
- try {
- r.eval("source('tukeys.r', echo=TRUE)");
-
- int n = Integer.parseInt(configs.get("tukeys.n"));
- r.eval("n <- " + n);
-
- loadDataSets(r, trainData, testData);
-
- r.eval("an <- ams_tukeys(train_data, test_data, n)");
- REXP exp = r.eval("an");
- RVector cont = (RVector) exp.getContent();
- List<double[]> result = new ArrayList();
- for (int i = 0; i< cont.size(); i++) {
- result.add(cont.at(i).asDoubleArray());
- }
- return new ResultSet(result);
- } catch(Exception e) {
- e.printStackTrace();
- } finally {
- r.end();
- }
- return null;
- }
-
- public static ResultSet ema_global(DataSet trainData, DataSet testData, Map<String, String> configs) {
- try {
- r.eval("source('ema.R', echo=TRUE)");
-
- int n = Integer.parseInt(configs.get("ema.n"));
- r.eval("n <- " + n);
-
- double w = Double.parseDouble(configs.get("ema.w"));
- r.eval("w <- " + w);
-
- loadDataSets(r, trainData, testData);
-
- r.eval("an <- ema_global(train_data, test_data, w, n)");
- REXP exp = r.eval("an");
- RVector cont = (RVector) exp.getContent();
- List<double[]> result = new ArrayList();
- for (int i = 0; i< cont.size(); i++) {
- result.add(cont.at(i).asDoubleArray());
- }
- return new ResultSet(result);
-
- } catch(Exception e) {
- e.printStackTrace();
- } finally {
- r.end();
- }
- return null;
- }
-
- public static ResultSet ema_daily(DataSet trainData, DataSet testData, Map<String, String> configs) {
- try {
- r.eval("source('ema.R', echo=TRUE)");
-
- int n = Integer.parseInt(configs.get("ema.n"));
- r.eval("n <- " + n);
-
- double w = Double.parseDouble(configs.get("ema.w"));
- r.eval("w <- " + w);
-
- loadDataSets(r, trainData, testData);
-
- r.eval("an <- ema_daily(train_data, test_data, w, n)");
- REXP exp = r.eval("an");
- RVector cont = (RVector) exp.getContent();
- List<double[]> result = new ArrayList();
- for (int i = 0; i< cont.size(); i++) {
- result.add(cont.at(i).asDoubleArray());
- }
- return new ResultSet(result);
-
- } catch(Exception e) {
- e.printStackTrace();
- } finally {
- r.end();
- }
- return null;
- }
-
- public static ResultSet ksTest(DataSet trainData, DataSet testData, Map<String, String> configs) {
- try {
- r.eval("source('kstest.r', echo=TRUE)");
-
- double p_value = Double.parseDouble(configs.get("ks.p_value"));
- r.eval("p_value <- " + p_value);
-
- loadDataSets(r, trainData, testData);
-
- r.eval("an <- ams_ks(train_data, test_data, p_value)");
- REXP exp = r.eval("an");
- RVector cont = (RVector) exp.getContent();
- List<double[]> result = new ArrayList();
- for (int i = 0; i< cont.size(); i++) {
- result.add(cont.at(i).asDoubleArray());
- }
- return new ResultSet(result);
-
- } catch(Exception e) {
- e.printStackTrace();
- } finally {
- r.end();
- }
- return null;
- }
-
- public static ResultSet hsdev(DataSet trainData, DataSet testData, Map<String, String> configs) {
- try {
- r.eval("source('hsdev.r', echo=TRUE)");
-
- int n = Integer.parseInt(configs.get("hsdev.n"));
- r.eval("n <- " + n);
-
- int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
- r.eval("nhp <- " + nhp);
-
- long interval = Long.parseLong(configs.get("hsdev.interval"));
- r.eval("interval <- " + interval);
-
- long period = Long.parseLong(configs.get("hsdev.period"));
- r.eval("period <- " + period);
-
- loadDataSets(r, trainData, testData);
-
- r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
- REXP exp = r.eval("an2");
- RVector cont = (RVector) exp.getContent();
-
- List<double[]> result = new ArrayList();
- for (int i = 0; i< cont.size(); i++) {
- result.add(cont.at(i).asDoubleArray());
- }
- return new ResultSet(result);
- } catch(Exception e) {
- e.printStackTrace();
- } finally {
- r.end();
- }
- return null;
- }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java
deleted file mode 100644
index 4dbb425..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.common;
-
-public class MetricAnomaly {
-
- private String metricKey;
- private long timestamp;
- private double metricValue;
- private MethodResult methodResult;
-
- public MetricAnomaly(String metricKey, long timestamp, double metricValue, MethodResult methodResult) {
- this.metricKey = metricKey;
- this.timestamp = timestamp;
- this.metricValue = metricValue;
- this.methodResult = methodResult;
- }
-
- public String getMetricKey() {
- return metricKey;
- }
-
- public void setMetricName(String metricName) {
- this.metricKey = metricName;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- public double getMetricValue() {
- return metricValue;
- }
-
- public void setMetricValue(double metricValue) {
- this.metricValue = metricValue;
- }
-
- public MethodResult getMethodResult() {
- return methodResult;
- }
-
- public void setMethodResult(MethodResult methodResult) {
- this.methodResult = methodResult;
- }
-
- public String getAnomalyAsString() {
- return metricKey + ":" + timestamp + ":" + metricValue + ":" + methodResult.prettyPrint();
- }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java
deleted file mode 100644
index acd4452..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.common;
-
-
-public class SingleValuedTimelineMetric {
- private Long timestamp;
- private Double value;
- private String metricName;
- private String appId;
- private String instanceId;
- private String hostName;
- private Long startTime;
- private String type;
-
- public void setSingleTimeseriesValue(Long timestamp, Double value) {
- this.timestamp = timestamp;
- this.value = value;
- }
-
- public SingleValuedTimelineMetric(String metricName, String appId,
- String instanceId, String hostName,
- long timestamp, long startTime, String type) {
- this.metricName = metricName;
- this.appId = appId;
- this.instanceId = instanceId;
- this.hostName = hostName;
- this.timestamp = timestamp;
- this.startTime = startTime;
- this.type = type;
- }
-
- public Long getTimestamp() {
- return timestamp;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public String getType() {
- return type;
- }
-
- public Double getValue() {
- return value;
- }
-
- public String getMetricName() {
- return metricName;
- }
-
- public String getAppId() {
- return appId;
- }
-
- public String getInstanceId() {
- return instanceId;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public boolean equalsExceptTime(TimelineMetric metric) {
- if (!metricName.equals(metric.getMetricName())) return false;
- if (hostName != null ? !hostName.equals(metric.getHostName()) : metric.getHostName() != null)
- return false;
- if (appId != null ? !appId.equals(metric.getAppId()) : metric.getAppId() != null)
- return false;
- if (instanceId != null ? !instanceId.equals(metric.getInstanceId()) : metric.getInstanceId() != null) return false;
-
- return true;
- }
-
- public TimelineMetric getTimelineMetric() {
- TimelineMetric metric = new TimelineMetric();
- metric.setMetricName(this.metricName);
- metric.setAppId(this.appId);
- metric.setHostName(this.hostName);
- metric.setType(this.type);
- metric.setInstanceId(this.instanceId);
- metric.setStartTime(this.startTime);
- metric.setTimestamp(this.timestamp);
- metric.getMetricValues().put(timestamp, value);
- return metric;
- }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java
deleted file mode 100644
index 88ad834..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.common;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-@XmlRootElement(name = "metric")
-@XmlAccessorType(XmlAccessType.NONE)
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineMetric implements Comparable<TimelineMetric>, Serializable {
-
- private String metricName;
- private String appId;
- private String instanceId;
- private String hostName;
- private long timestamp;
- private long startTime;
- private String type;
- private String units;
- private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
- private Map<String, String> metadata = new HashMap<>();
-
- // default
- public TimelineMetric() {
-
- }
-
- public TimelineMetric(String metricName, String appId, String hostName, TreeMap<Long,Double> metricValues) {
- this.metricName = metricName;
- this.appId = appId;
- this.hostName = hostName;
- this.metricValues.putAll(metricValues);
- }
-
- // copy constructor
- public TimelineMetric(TimelineMetric metric) {
- setMetricName(metric.getMetricName());
- setType(metric.getType());
- setUnits(metric.getUnits());
- setTimestamp(metric.getTimestamp());
- setAppId(metric.getAppId());
- setInstanceId(metric.getInstanceId());
- setHostName(metric.getHostName());
- setStartTime(metric.getStartTime());
- setMetricValues(new TreeMap<Long, Double>(metric.getMetricValues()));
- }
-
- @XmlElement(name = "metricname")
- public String getMetricName() {
- return metricName;
- }
-
- public void setMetricName(String metricName) {
- this.metricName = metricName;
- }
-
- @XmlElement(name = "appid")
- public String getAppId() {
- return appId;
- }
-
- public void setAppId(String appId) {
- this.appId = appId;
- }
-
- @XmlElement(name = "instanceid")
- public String getInstanceId() {
- return instanceId;
- }
-
- public void setInstanceId(String instanceId) {
- this.instanceId = instanceId;
- }
-
- @XmlElement(name = "hostname")
- public String getHostName() {
- return hostName;
- }
-
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- @XmlElement(name = "timestamp")
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- @XmlElement(name = "starttime")
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- @XmlElement(name = "type", defaultValue = "UNDEFINED")
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- @XmlElement(name = "units")
- public String getUnits() {
- return units;
- }
-
- public void setUnits(String units) {
- this.units = units;
- }
-
- @XmlElement(name = "metrics")
- public TreeMap<Long, Double> getMetricValues() {
- return metricValues;
- }
-
- public void setMetricValues(TreeMap<Long, Double> metricValues) {
- this.metricValues = metricValues;
- }
-
- public void addMetricValues(Map<Long, Double> metricValues) {
- this.metricValues.putAll(metricValues);
- }
-
- @XmlElement(name = "metadata")
- public Map<String,String> getMetadata () {
- return metadata;
- }
-
- public void setMetadata (Map<String,String> metadata) {
- this.metadata = metadata;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TimelineMetric metric = (TimelineMetric) o;
-
- if (!metricName.equals(metric.metricName)) return false;
- if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
- return false;
- if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
- return false;
- if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
- return false;
- if (timestamp != metric.timestamp) return false;
- if (startTime != metric.startTime) return false;
-
- return true;
- }
-
- public boolean equalsExceptTime(TimelineMetric metric) {
- if (!metricName.equals(metric.metricName)) return false;
- if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
- return false;
- if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
- return false;
- if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
- return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = metricName.hashCode();
- result = 31 * result + (appId != null ? appId.hashCode() : 0);
- result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
- result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
- result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
- return result;
- }
-
- @Override
- public int compareTo(TimelineMetric other) {
- if (timestamp > other.timestamp) {
- return -1;
- } else if (timestamp < other.timestamp) {
- return 1;
- } else {
- return metricName.compareTo(other.metricName);
- }
- }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java
deleted file mode 100644
index 7df6a9c..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.common;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The class that hosts a list of timeline entities.
- */
-@XmlRootElement(name = "metrics")
-@XmlAccessorType(XmlAccessType.NONE)
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineMetrics implements Serializable {
-
- private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
-
- public TimelineMetrics() {}
-
- @XmlElement(name = "metrics")
- public List<TimelineMetric> getMetrics() {
- return allMetrics;
- }
-
- public void setMetrics(List<TimelineMetric> allMetrics) {
- this.allMetrics = allMetrics;
- }
-
- private boolean isEqualTimelineMetrics(TimelineMetric metric1,
- TimelineMetric metric2) {
-
- boolean isEqual = true;
-
- if (!metric1.getMetricName().equals(metric2.getMetricName())) {
- return false;
- }
-
- if (metric1.getHostName() != null) {
- isEqual = metric1.getHostName().equals(metric2.getHostName());
- }
-
- if (metric1.getAppId() != null) {
- isEqual = metric1.getAppId().equals(metric2.getAppId());
- }
-
- return isEqual;
- }
-
- /**
- * Merge with existing TimelineMetric if everything except startTime is
- * the same.
- * @param metric {@link TimelineMetric}
- */
- public void addOrMergeTimelineMetric(TimelineMetric metric) {
- TimelineMetric metricToMerge = null;
-
- if (!allMetrics.isEmpty()) {
- for (TimelineMetric timelineMetric : allMetrics) {
- if (timelineMetric.equalsExceptTime(metric)) {
- metricToMerge = timelineMetric;
- break;
- }
- }
- }
-
- if (metricToMerge != null) {
- metricToMerge.addMetricValues(metric.getMetricValues());
- if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
- metricToMerge.setTimestamp(metric.getTimestamp());
- }
- if (metricToMerge.getStartTime() > metric.getStartTime()) {
- metricToMerge.setStartTime(metric.getStartTime());
- }
- } else {
- allMetrics.add(metric);
- }
- }
-
- // Optimization that addresses too many TreeMaps from getting created.
- public void addOrMergeTimelineMetric(SingleValuedTimelineMetric metric) {
- TimelineMetric metricToMerge = null;
-
- if (!allMetrics.isEmpty()) {
- for (TimelineMetric timelineMetric : allMetrics) {
- if (metric.equalsExceptTime(timelineMetric)) {
- metricToMerge = timelineMetric;
- break;
- }
- }
- }
-
- if (metricToMerge != null) {
- metricToMerge.getMetricValues().put(metric.getTimestamp(), metric.getValue());
- if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
- metricToMerge.setTimestamp(metric.getTimestamp());
- }
- if (metricToMerge.getStartTime() > metric.getStartTime()) {
- metricToMerge.setStartTime(metric.getStartTime());
- }
- } else {
- allMetrics.add(metric.getTimelineMetric());
- }
- }
-}
-
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java
deleted file mode 100644
index 32cd96b..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.methods.ema;
-
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-
-@XmlRootElement
-public class EmaDS implements Serializable {
-
- String metricName;
- String appId;
- String hostname;
- double ema;
- double ems;
- double weight;
- int timessdev;
- private static final Log LOG = LogFactory.getLog(EmaDS.class);
-
- public EmaDS(String metricName, String appId, String hostname, double weight, int timessdev) {
- this.metricName = metricName;
- this.appId = appId;
- this.hostname = hostname;
- this.weight = weight;
- this.timessdev = timessdev;
- this.ema = 0.0;
- this.ems = 0.0;
- }
-
-
- public EmaResult testAndUpdate(double metricValue) {
-
- double diff = Math.abs(ema - metricValue) - (timessdev * ems);
-
- ema = weight * ema + (1 - weight) * metricValue;
- ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
- LOG.info(ema + ", " + ems);
- return diff > 0 ? new EmaResult(diff) : null;
- }
-
- public void update(double metricValue) {
- ema = weight * ema + (1 - weight) * metricValue;
- ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
- LOG.info(ema + ", " + ems);
- }
-
- public EmaResult test(double metricValue) {
- double diff = Math.abs(ema - metricValue) - (timessdev * ems);
- return diff > 0 ? new EmaResult(diff) : null;
- }
-
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java
deleted file mode 100644
index 13a0f55..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.methods.ema;
-
-import com.google.gson.Gson;
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
-import org.apache.ambari.metrics.alertservice.common.MethodResult;
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel;
-import org.apache.spark.SparkContext;
-import org.apache.spark.mllib.util.Saveable;
-
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.*;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@XmlRootElement
-public class EmaModel implements MetricAnomalyModel, Saveable, Serializable {
-
- @XmlElement(name = "trackedEmas")
- private Map<String, EmaDS> trackedEmas = new HashMap<>();
- private static final Log LOG = LogFactory.getLog(EmaModel.class);
-
- public List<MetricAnomaly> onNewMetric(TimelineMetric metric) {
-
- String metricName = metric.getMetricName();
- String appId = metric.getAppId();
- String hostname = metric.getHostName();
- String key = metricName + "_" + appId + "_" + hostname;
- List<MetricAnomaly> anomalies = new ArrayList<>();
-
- if (!trackedEmas.containsKey(metricName)) {
- trackedEmas.put(key, new EmaDS(metricName, appId, hostname, 0.8, 3));
- }
-
- EmaDS emaDS = trackedEmas.get(key);
- for (Long timestamp : metric.getMetricValues().keySet()) {
- double metricValue = metric.getMetricValues().get(timestamp);
- MethodResult result = emaDS.testAndUpdate(metricValue);
- if (result != null) {
- MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result);
- anomalies.add(metricAnomaly);
- }
- }
- return anomalies;
- }
-
- public EmaDS train(TimelineMetric metric, double weight, int timessdev) {
-
- String metricName = metric.getMetricName();
- String appId = metric.getAppId();
- String hostname = metric.getHostName();
- String key = metricName + "_" + appId + "_" + hostname;
-
- EmaDS emaDS = new EmaDS(metric.getMetricName(), metric.getAppId(), metric.getHostName(), weight, timessdev);
- LOG.info("In EMA Train step");
- for (Long timestamp : metric.getMetricValues().keySet()) {
- emaDS.update(metric.getMetricValues().get(timestamp));
- }
- trackedEmas.put(key, emaDS);
- return emaDS;
- }
-
- public List<MetricAnomaly> test(TimelineMetric metric) {
- String metricName = metric.getMetricName();
- String appId = metric.getAppId();
- String hostname = metric.getHostName();
- String key = metricName + "_" + appId + "_" + hostname;
-
- EmaDS emaDS = trackedEmas.get(key);
-
- if (emaDS == null) {
- return new ArrayList<>();
- }
-
- List<MetricAnomaly> anomalies = new ArrayList<>();
-
- for (Long timestamp : metric.getMetricValues().keySet()) {
- double metricValue = metric.getMetricValues().get(timestamp);
- MethodResult result = emaDS.testAndUpdate(metricValue);
- if (result != null) {
- MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result);
- anomalies.add(metricAnomaly);
- }
- }
- return anomalies;
- }
-
- @Override
- public void save(SparkContext sc, String path) {
- Gson gson = new Gson();
- try {
- String json = gson.toJson(this);
- try (Writer writer = new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(path), "utf-8"))) {
- writer.write(json);
- } } catch (IOException e) {
- LOG.error(e);
- }
- }
-
- @Override
- public String formatVersion() {
- return "1.0";
- }
-
-}
-
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java
deleted file mode 100644
index b851dab..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.methods.ema;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.gson.Gson;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class TestEmaModel {
-
- public static void main(String[] args) throws IOException {
-
- long now = System.currentTimeMillis();
- TimelineMetric metric1 = new TimelineMetric();
- metric1.setMetricName("dummy_metric");
- metric1.setHostName("dummy_host");
- metric1.setTimestamp(now);
- metric1.setStartTime(now - 1000);
- metric1.setAppId("HOST");
- metric1.setType("Integer");
-
- TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-
- for (int i = 0; i<20;i++) {
- double metric = 9 + Math.random();
- metricValues.put(now - i*100, metric);
- }
- metric1.setMetricValues(metricValues);
-
- EmaModel emaModel = new EmaModel();
-
- emaModel.train(metric1, 0.8, 3);
- }
-
- /*
- {{
- put(now - 100, 1.20);
- put(now - 200, 1.25);
- put(now - 300, 1.30);
- put(now - 400, 4.50);
- put(now - 500, 1.35);
- put(now - 400, 5.50);
- }}
- */
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
new file mode 100644
index 0000000..0c1c6fc
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+public class AmbariServerInterface implements Serializable{
+
+ private static final Log LOG = LogFactory.getLog(AmbariServerInterface.class);
+
+ private String ambariServerHost;
+ private String clusterName;
+
+ public AmbariServerInterface(String ambariServerHost, String clusterName) {
+ this.ambariServerHost = ambariServerHost;
+ this.clusterName = clusterName;
+ }
+
+ public int getPointInTimeSensitivity() {
+
+ String url = constructUri("http", ambariServerHost, "8080", "/api/v1/clusters/" + clusterName + "/alert_definitions?fields=*");
+
+ URL obj = null;
+ BufferedReader in = null;
+
+ try {
+ obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+
+ String encoded = Base64.getEncoder().encodeToString(("admin:admin").getBytes(StandardCharsets.UTF_8));
+ con.setRequestProperty("Authorization", "Basic "+encoded);
+
+ int responseCode = con.getResponseCode();
+ LOG.info("Sending 'GET' request to URL : " + url);
+ LOG.info("Response Code : " + responseCode);
+
+ in = new BufferedReader(
+ new InputStreamReader(con.getInputStream()));
+
+ StringBuilder responseJsonSb = new StringBuilder();
+ String line;
+ while ((line = in.readLine()) != null) {
+ responseJsonSb.append(line);
+ }
+
+ JSONObject jsonObject = new JSONObject(responseJsonSb.toString());
+ JSONArray array = jsonObject.getJSONArray("items");
+ for(int i = 0 ; i < array.length() ; i++){
+ JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition");
+ LOG.info("alertDefn : " + alertDefn.get("name"));
+ if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) {
+ JSONObject sourceNode = alertDefn.getJSONObject("source");
+ JSONArray params = sourceNode.getJSONArray("parameters");
+ for(int j = 0 ; j < params.length() ; j++){
+ JSONObject param = params.getJSONObject(j);
+ if (param.get("name").equals("sensitivity")) {
+ return param.getInt("value");
+ }
+ }
+ break;
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ LOG.warn(e);
+ }
+ }
+ }
+
+ return -1;
+ }
+
+ private String constructUri(String protocol, String host, String port, String path) {
+ StringBuilder sb = new StringBuilder(protocol);
+ sb.append("://");
+ sb.append(host);
+ sb.append(":");
+ sb.append(port);
+ sb.append(path);
+ return sb.toString();
+ }
+
+// public static void main(String[] args) {
+// AmbariServerInterface ambariServerInterface = new AmbariServerInterface();
+// ambariServerInterface.getPointInTimeSensitivity("avijayan-ams-1.openstacklocal","c1");
+// }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java
new file mode 100644
index 0000000..490328a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class MetricAnomalyDetectorTestInput {
+
+ public MetricAnomalyDetectorTestInput() {
+ }
+
+ //Train data
+ private String trainDataName;
+ private String trainDataType;
+ private Map<String, String> trainDataConfigs;
+ private int trainDataSize;
+
+ //Test data
+ private String testDataName;
+ private String testDataType;
+ private Map<String, String> testDataConfigs;
+ private int testDataSize;
+
+ //Algorithm data
+ private List<String> methods;
+ private Map<String, String> methodConfigs;
+
+ public String getTrainDataName() {
+ return trainDataName;
+ }
+
+ public void setTrainDataName(String trainDataName) {
+ this.trainDataName = trainDataName;
+ }
+
+ public String getTrainDataType() {
+ return trainDataType;
+ }
+
+ public void setTrainDataType(String trainDataType) {
+ this.trainDataType = trainDataType;
+ }
+
+ public Map<String, String> getTrainDataConfigs() {
+ return trainDataConfigs;
+ }
+
+ public void setTrainDataConfigs(Map<String, String> trainDataConfigs) {
+ this.trainDataConfigs = trainDataConfigs;
+ }
+
+ public String getTestDataName() {
+ return testDataName;
+ }
+
+ public void setTestDataName(String testDataName) {
+ this.testDataName = testDataName;
+ }
+
+ public String getTestDataType() {
+ return testDataType;
+ }
+
+ public void setTestDataType(String testDataType) {
+ this.testDataType = testDataType;
+ }
+
+ public Map<String, String> getTestDataConfigs() {
+ return testDataConfigs;
+ }
+
+ public void setTestDataConfigs(Map<String, String> testDataConfigs) {
+ this.testDataConfigs = testDataConfigs;
+ }
+
+ public Map<String, String> getMethodConfigs() {
+ return methodConfigs;
+ }
+
+ public void setMethodConfigs(Map<String, String> methodConfigs) {
+ this.methodConfigs = methodConfigs;
+ }
+
+ public int getTrainDataSize() {
+ return trainDataSize;
+ }
+
+ public void setTrainDataSize(int trainDataSize) {
+ this.trainDataSize = trainDataSize;
+ }
+
+ public int getTestDataSize() {
+ return testDataSize;
+ }
+
+ public void setTestDataSize(int testDataSize) {
+ this.testDataSize = testDataSize;
+ }
+
+ public List<String> getMethods() {
+ return methods;
+ }
+
+ public void setMethods(List<String> methods) {
+ this.methods = methods;
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java
new file mode 100644
index 0000000..bff8120
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class MetricAnomalyTester {
+
+ public static String appId = MetricsCollectorInterface.serviceName;
+ static final Log LOG = LogFactory.getLog(MetricAnomalyTester.class);
+ static Map<String, TimelineMetric> timelineMetricMap = new HashMap<>();
+
+ public static TimelineMetrics runTestAnomalyRequest(MetricAnomalyDetectorTestInput input) throws UnknownHostException {
+
+ long currentTime = System.currentTimeMillis();
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ String hostname = InetAddress.getLocalHost().getHostName();
+
+ //Train data
+ TimelineMetric metric1 = new TimelineMetric();
+ if (StringUtils.isNotEmpty(input.getTrainDataName())) {
+ metric1 = timelineMetricMap.get(input.getTrainDataName());
+ if (metric1 == null) {
+ metric1 = new TimelineMetric();
+ double[] trainSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTrainDataType(), input.getTrainDataSize(), input.getTrainDataConfigs());
+ metric1.setMetricName(input.getTrainDataName());
+ metric1.setAppId(appId);
+ metric1.setHostName(hostname);
+ metric1.setStartTime(currentTime);
+ metric1.setInstanceId(null);
+ metric1.setMetricValues(getAsTimeSeries(currentTime, trainSeries));
+ timelineMetricMap.put(input.getTrainDataName(), metric1);
+ }
+ timelineMetrics.getMetrics().add(metric1);
+ } else {
+ LOG.error("No train data name specified");
+ }
+
+ //Test data
+ TimelineMetric metric2 = new TimelineMetric();
+ if (StringUtils.isNotEmpty(input.getTestDataName())) {
+ metric2 = timelineMetricMap.get(input.getTestDataName());
+ if (metric2 == null) {
+ metric2 = new TimelineMetric();
+ double[] testSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTestDataType(), input.getTestDataSize(), input.getTestDataConfigs());
+ metric2.setMetricName(input.getTestDataName());
+ metric2.setAppId(appId);
+ metric2.setHostName(hostname);
+ metric2.setStartTime(currentTime);
+ metric2.setInstanceId(null);
+ metric2.setMetricValues(getAsTimeSeries(currentTime, testSeries));
+ timelineMetricMap.put(input.getTestDataName(), metric2);
+ }
+ timelineMetrics.getMetrics().add(metric2);
+ } else {
+ LOG.warn("No test data name specified");
+ }
+
+ //Invoke method
+ if (CollectionUtils.isNotEmpty(input.getMethods())) {
+ RFunctionInvoker.setScriptsDir("/etc/ambari-metrics-collector/conf/R-scripts");
+ for (String methodType : input.getMethods()) {
+ ResultSet result = RFunctionInvoker.executeMethod(methodType, getAsDataSeries(metric1), getAsDataSeries(metric2), input.getMethodConfigs());
+ TimelineMetric timelineMetric = getAsTimelineMetric(result, methodType, input, currentTime, hostname);
+ if (timelineMetric != null) {
+ timelineMetrics.getMetrics().add(timelineMetric);
+ }
+ }
+ } else {
+ LOG.warn("No anomaly method requested");
+ }
+
+ return timelineMetrics;
+ }
+
+
+ private static TimelineMetric getAsTimelineMetric(ResultSet result, String methodType, MetricAnomalyDetectorTestInput input, long currentTime, String hostname) {
+
+ if (result == null) {
+ return null;
+ }
+
+ TimelineMetric timelineMetric = new TimelineMetric();
+ if (methodType.equals("tukeys") || methodType.equals("ema")) {
+ timelineMetric.setMetricName(input.getTrainDataName() + "_" + input.getTestDataName() + "_" + methodType + "_" + currentTime);
+ timelineMetric.setHostName(hostname);
+ timelineMetric.setAppId(appId);
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setStartTime(currentTime);
+
+ TreeMap<Long, Double> metricValues = new TreeMap<>();
+ if (result.resultset.size() > 0) {
+ double[] ts = result.resultset.get(0);
+ double[] metrics = result.resultset.get(1);
+ for (int i = 0; i < ts.length; i++) {
+ if (i == 0) {
+ timelineMetric.setStartTime((long) ts[i]);
+ }
+ metricValues.put((long) ts[i], metrics[i]);
+ }
+ }
+ timelineMetric.setMetricValues(metricValues);
+ return timelineMetric;
+ }
+ return null;
+ }
+
+
+ private static TreeMap<Long, Double> getAsTimeSeries(long currentTime, double[] values) {
+
+ long startTime = currentTime - (values.length - 1) * 60 * 1000;
+ TreeMap<Long, Double> metricValues = new TreeMap<>();
+
+ for (int i = 0; i < values.length; i++) {
+ metricValues.put(startTime, values[i]);
+ startTime += (60 * 1000);
+ }
+ return metricValues;
+ }
+
+ private static DataSeries getAsDataSeries(TimelineMetric timelineMetric) {
+
+ TreeMap<Long, Double> metricValues = timelineMetric.getMetricValues();
+ double[] timestamps = new double[metricValues.size()];
+ double[] values = new double[metricValues.size()];
+ int i = 0;
+
+ for (Long timestamp : metricValues.keySet()) {
+ timestamps[i] = timestamp;
+ values[i++] = metricValues.get(timestamp);
+ }
+ return new DataSeries(timelineMetric.getMetricName() + "_" + timelineMetric.getAppId() + "_" + timelineMetric.getHostName(), timestamps, values);
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java
similarity index 56%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java
index daaee5c..8023d15 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java
@@ -15,25 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ambari.metrics.alertservice.spark;
+package org.apache.ambari.metrics.alertservice.prototype;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
-import org.apache.kafka.clients.producer.*;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
-import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-public class AmsKafkaProducer {
+public class MetricKafkaProducer {
Producer producer;
private static String topicName = "ambari-metrics-topic";
- public AmsKafkaProducer(String kafkaServers) {
+ public MetricKafkaProducer(String kafkaServers) {
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667"
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
@@ -51,42 +53,4 @@ public class AmsKafkaProducer {
System.out.println(kafkaFuture.isDone());
System.out.println(kafkaFuture.get().topic());
}
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- final long now = System.currentTimeMillis();
-
- TimelineMetrics timelineMetrics = new TimelineMetrics();
- TimelineMetric metric1 = new TimelineMetric();
- metric1.setMetricName("mem_free");
- metric1.setHostName("avijayan-ams-3.openstacklocal");
- metric1.setTimestamp(now);
- metric1.setStartTime(now - 1000);
- metric1.setAppId("HOST");
- metric1.setType("Integer");
-
- TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-
- for (int i = 0; i<20;i++) {
- double metric = 20000 + Math.random();
- metricValues.put(now - i*100, metric);
- }
-
- metric1.setMetricValues(metricValues);
-
-// metric1.setMetricValues(new TreeMap<Long, Double>() {{
-// put(now - 100, 1.20);
-// put(now - 200, 11.25);
-// put(now - 300, 1.30);
-// put(now - 400, 4.50);
-// put(now - 500, 16.35);
-// put(now - 400, 5.50);
-// }});
-
- timelineMetrics.getMetrics().add(metric1);
-
- for (int i = 0; i<1; i++) {
- new AmsKafkaProducer("avijayan-ams-2.openstacklocal:6667").sendMetrics(timelineMetrics);
- Thread.sleep(1000);
- }
- }
}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
new file mode 100644
index 0000000..7735d6c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import scala.Tuple2;
+
+import java.util.*;
+
+public class MetricSparkConsumer {
+
+ private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class);
+ private static String groupId = "ambari-metrics-group";
+ private static String topicName = "ambari-metrics-topic";
+ private static int numThreads = 1;
+ private static long pitStartTime = System.currentTimeMillis();
+ private static long ksStartTime = pitStartTime;
+ private static long hdevStartTime = ksStartTime;
+
+ public MetricSparkConsumer() {
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+
+ if (args.length < 5) {
+ System.err.println("Usage: MetricSparkConsumer <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
+ System.exit(1);
+ }
+
+ List<String> appIds = Arrays.asList(args[0].split(","));
+ String collectorHost = args[1];
+ String collectorPort = args[2];
+ String collectorProtocol = args[3];
+ String zkQuorum = args[4];
+
+ double emaW = StringUtils.isNotEmpty(args[5]) ? Double.parseDouble(args[5]) : 0.5;
+ double emaN = StringUtils.isNotEmpty(args[8]) ? Double.parseDouble(args[6]) : 3;
+ double tukeysN = StringUtils.isNotEmpty(args[7]) ? Double.parseDouble(args[7]) : 3;
+
+ long pitTestInterval = StringUtils.isNotEmpty(args[8]) ? Long.parseLong(args[8]) : 5 * 60 * 1000;
+ long pitTrainInterval = StringUtils.isNotEmpty(args[9]) ? Long.parseLong(args[9]) : 15 * 60 * 1000;
+
+ String fileName = args[10];
+ long ksTestInterval = StringUtils.isNotEmpty(args[11]) ? Long.parseLong(args[11]) : 10 * 60 * 1000;
+ long ksTrainInterval = StringUtils.isNotEmpty(args[12]) ? Long.parseLong(args[12]) : 10 * 60 * 1000;
+ int hsdevNhp = StringUtils.isNotEmpty(args[13]) ? Integer.parseInt(args[13]) : 3;
+ long hsdevInterval = StringUtils.isNotEmpty(args[14]) ? Long.parseLong(args[14]) : 30 * 60 * 1000;
+
+ String ambariServerHost = args[15];
+ String clusterName = args[16];
+
+ MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
+
+ SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector");
+
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
+
+ EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN);
+ PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface,
+ tukeysN,
+ pitTestInterval,
+ pitTrainInterval,
+ ambariServerHost,
+ clusterName);
+
+ TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
+ ksTestInterval,
+ ksTrainInterval,
+ hsdevNhp,
+ fileName);
+
+ Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique);
+ Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem);
+ Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem);
+ Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface);
+
+ JavaPairReceiverInputDStream<String, String> messages =
+ KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
+
+ //Convert JSON string to TimelineMetrics.
+ JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
+ @Override
+ public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
+ return metrics;
+ }
+ });
+
+ timelineMetricsStream.print();
+
+ //Group TimelineMetric by AppId.
+ JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
+ timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics)
+ );
+
+ appMetricStream.print();
+
+ //Filter AppIds that are not needed.
+ JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
+ return appIds.contains(appMetricTuple._1);
+ }
+ });
+
+ filteredAppMetricStream.print();
+
+ filteredAppMetricStream.foreachRDD(rdd -> {
+ rdd.foreach(
+ tuple2 -> {
+ long currentTime = System.currentTimeMillis();
+ EmaTechnique ema = emaTechniqueBroadcast.getValue();
+ if (currentTime > pitStartTime + pitTestInterval) {
+ LOG.info("Running Tukeys....");
+ pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime);
+ pitStartTime = pitStartTime + pitTestInterval;
+ }
+
+ if (currentTime > ksStartTime + ksTestInterval) {
+ LOG.info("Running KS Test....");
+ trendADSystemBroadcast.getValue().runKSTest(currentTime);
+ ksStartTime = ksStartTime + ksTestInterval;
+ }
+
+ if (currentTime > hdevStartTime + hsdevInterval) {
+ LOG.info("Running HSdev Test....");
+ trendADSystemBroadcast.getValue().runHsdevMethod();
+ hdevStartTime = hdevStartTime + hsdevInterval;
+ }
+
+ TimelineMetrics metrics = tuple2._2();
+ for (TimelineMetric timelineMetric : metrics.getMetrics()) {
+ List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+ metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+ }
+ });
+ });
+
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
+
+
+
+
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
new file mode 100644
index 0000000..7b3f63d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+public class MetricsCollectorInterface implements Serializable {
+
+ private static String hostName = null;
+ private String instanceId = null;
+ public final static String serviceName = "anomaly-engine";
+ private String collectorHost;
+ private String protocol;
+ private String port;
+ private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
+ private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class);
+ private static ObjectMapper mapper;
+ private final static ObjectReader timelineObjectReader;
+
+ static {
+ mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ mapper.setAnnotationIntrospector(introspector);
+ mapper.getSerializationConfig()
+ .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ timelineObjectReader = mapper.reader(TimelineMetrics.class);
+ }
+
+ public MetricsCollectorInterface(String collectorHost, String protocol, String port) {
+ this.collectorHost = collectorHost;
+ this.protocol = protocol;
+ this.port = port;
+ this.hostName = getDefaultLocalHostName();
+ }
+
+ public static String getDefaultLocalHostName() {
+
+ if (hostName != null) {
+ return hostName;
+ }
+
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ LOG.info("Error getting host address");
+ }
+ return null;
+ }
+
+ public void publish(List<MetricAnomaly> metricAnomalies) {
+ if (CollectionUtils.isNotEmpty(metricAnomalies)) {
+ LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
+ List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
+ if (!metricList.isEmpty()) {
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(metricList);
+ emitMetrics(timelineMetrics);
+ }
+ } else {
+ LOG.info("No anomalies to send.");
+ }
+ }
+
+ private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
+ List<TimelineMetric> metrics = new ArrayList<>();
+
+ if (metricAnomalies.isEmpty()) {
+ return metrics;
+ }
+
+ for (MetricAnomaly anomaly : metricAnomalies) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(anomaly.getMetricKey());
+ timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType());
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setHostName(getDefaultLocalHostName());
+ timelineMetric.setStartTime(anomaly.getTimestamp());
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", anomaly.getMethodType());
+ metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore()));
+ timelineMetric.setMetadata(metadata);
+ TreeMap<Long,Double> metricValues = new TreeMap<>();
+ metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue());
+ timelineMetric.setMetricValues(metricValues);
+
+ metrics.add(timelineMetric);
+ }
+ return metrics;
+ }
+
+ public boolean emitMetrics(TimelineMetrics metrics) {
+ String connectUrl = constructTimelineMetricUri();
+ String jsonData = null;
+ LOG.info("EmitMetrics connectUrl = " + connectUrl);
+ try {
+ jsonData = mapper.writeValueAsString(metrics);
+ LOG.info(jsonData);
+ } catch (IOException e) {
+ LOG.error("Unable to parse metrics", e);
+ }
+ if (jsonData != null) {
+ return emitMetricsJson(connectUrl, jsonData);
+ }
+ return false;
+ }
+
+ private HttpURLConnection getConnection(String spec) throws IOException {
+ return (HttpURLConnection) new URL(spec).openConnection();
+ }
+
+ private boolean emitMetricsJson(String connectUrl, String jsonData) {
+ int timeout = 10000;
+ HttpURLConnection connection = null;
+ try {
+ if (connectUrl == null) {
+ throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+ }
+ connection = getConnection(connectUrl);
+
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Connection", "Keep-Alive");
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ connection.setDoOutput(true);
+
+ if (jsonData != null) {
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonData.getBytes("UTF-8"));
+ }
+ }
+
+ int statusCode = connection.getResponseCode();
+
+ if (statusCode != 200) {
+ LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
+ "statusCode = " + statusCode);
+ } else {
+ LOG.info("Metrics posted to Collector " + connectUrl);
+ }
+ return true;
+ } catch (IOException ioe) {
+ LOG.error(ioe.getMessage());
+ }
+ return false;
+ }
+
+ private String constructTimelineMetricUri() {
+ StringBuilder sb = new StringBuilder(protocol);
+ sb.append("://");
+ sb.append(collectorHost);
+ sb.append(":");
+ sb.append(port);
+ sb.append(WS_V1_TIMELINE_METRICS);
+ return sb.toString();
+ }
+
+ public TimelineMetrics fetchMetrics(String metricName,
+ String appId,
+ String hostname,
+ long startime,
+ long endtime) {
+
+ String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
+ "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
+ LOG.info("Fetch metrics URL : " + url);
+
+ URL obj = null;
+ BufferedReader in = null;
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+ try {
+ obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+ int responseCode = con.getResponseCode();
+ LOG.info("Sending 'GET' request to URL : " + url);
+ LOG.info("Response Code : " + responseCode);
+
+ in = new BufferedReader(
+ new InputStreamReader(con.getInputStream()));
+ timelineMetrics = timelineObjectReader.readValue(in);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ LOG.warn(e);
+ }
+ }
+ }
+
+ LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics.");
+ return timelineMetrics;
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
new file mode 100644
index 0000000..b4a8593
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaModel;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class PointInTimeADSystem implements Serializable {
+
+ //private EmaTechnique emaTechnique;
+ private MetricsCollectorInterface metricsCollectorInterface;
+ private Map<String, Double> tukeysNMap;
+ private double defaultTukeysN = 3;
+
+ private long testIntervalMillis = 5*60*1000; //10mins
+ private long trainIntervalMillis = 15*60*1000; //1hour
+
+ private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class);
+
+ private AmbariServerInterface ambariServerInterface;
+ private int sensitivity = 50;
+ private int minSensitivity = 0;
+ private int maxSensitivity = 10;
+
+ public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
+ long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
+ this.metricsCollectorInterface = metricsCollectorInterface;
+ this.defaultTukeysN = defaultTukeysN;
+ this.tukeysNMap = new HashMap<>();
+ this.testIntervalMillis = testIntervalMillis;
+ this.trainIntervalMillis = trainIntervalMillis;
+ this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName);
+ LOG.info("Starting PointInTimeADSystem...");
+ }
+
+ public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) {
+ LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]");
+
+ int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity();
+ if (requiredSensivity == -1 || requiredSensivity == sensitivity) {
+ LOG.info("No change in sensitivity needed.");
+ } else {
+ LOG.info("Current tukey's N value = " + defaultTukeysN);
+ if (requiredSensivity > sensitivity) {
+ int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
+ while (sensitivity < targetSensitivity) {
+ defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.1;
+ sensitivity++;
+ }
+ } else {
+ int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
+ while (sensitivity > targetSensitivity) {
+ defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.1;
+ sensitivity--;
+ }
+ }
+ LOG.info("New tukey's N value = " + defaultTukeysN);
+ }
+
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ for (String metricKey : emaTechnique.getTrackedEmas().keySet()) {
+ LOG.info("EMA key = " + metricKey);
+ EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey);
+ String metricName = emaModel.getMetricName();
+ String appId = emaModel.getAppId();
+ String hostname = emaModel.getHostname();
+
+ TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis),
+ startTime);
+
+ if (tukeysData.getMetrics().isEmpty()) {
+ LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey);
+ continue;
+ }
+
+ List<Double> trainTsList = new ArrayList<>();
+ List<Double> trainDataList = new ArrayList<>();
+ List<Double> testTsList = new ArrayList<>();
+ List<Double> testDataList = new ArrayList<>();
+
+ for (TimelineMetric metric : tukeysData.getMetrics()) {
+ for (Long timestamp : metric.getMetricValues().keySet()) {
+ if (timestamp <= (startTime - testIntervalMillis)) {
+ trainDataList.add(metric.getMetricValues().get(timestamp));
+ trainTsList.add((double)timestamp);
+ } else {
+ testDataList.add(metric.getMetricValues().get(timestamp));
+ testTsList.add((double)timestamp);
+ }
+ }
+ }
+
+ if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+ LOG.info("Not enough train/test data to perform analysis.");
+ continue;
+ }
+
+ String tukeysTrainSeries = "tukeysTrainSeries";
+ double[] trainTs = new double[trainTsList.size()];
+ double[] trainData = new double[trainTsList.size()];
+ for (int i = 0; i < trainTs.length; i++) {
+ trainTs[i] = trainTsList.get(i);
+ trainData[i] = trainDataList.get(i);
+ }
+
+ String tukeysTestSeries = "tukeysTestSeries";
+ double[] testTs = new double[testTsList.size()];
+ double[] testData = new double[testTsList.size()];
+ for (int i = 0; i < testTs.length; i++) {
+ testTs[i] = testTsList.get(i);
+ testData[i] = testDataList.get(i);
+ }
+
+ LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+ DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData);
+ DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData);
+
+ if (!tukeysNMap.containsKey(metricKey)) {
+ tukeysNMap.put(metricKey, defaultTukeysN);
+ }
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey)));
+
+ ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs);
+
+ List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname);
+ LOG.info("Tukeys anomalies size : " + tukeysMetrics.size());
+ TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>();
+
+ for (TimelineMetric tukeysMetric : tukeysMetrics) {
+ tukeysMetricValues.putAll(tukeysMetric.getMetricValues());
+ timelineMetrics.addOrMergeTimelineMetric(tukeysMetric);
+ }
+
+ TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime);
+ TreeMap<Long, Double> emaMetricValues = new TreeMap();
+ if (!emaData.getMetrics().isEmpty()) {
+ emaMetricValues = emaData.getMetrics().get(0).getMetricValues();
+ }
+
+ LOG.info("Ema anomalies size : " + emaMetricValues.size());
+ int tp = 0;
+ int tn = 0;
+ int fp = 0;
+ int fn = 0;
+
+ for (double ts : testTs) {
+ long timestamp = (long) ts;
+ if (tukeysMetricValues.containsKey(timestamp)) {
+ if (emaMetricValues.containsKey(timestamp)) {
+ tp++;
+ } else {
+ fn++;
+ }
+ } else {
+ if (emaMetricValues.containsKey(timestamp)) {
+ fp++;
+ } else {
+ tn++;
+ }
+ }
+ }
+
+ double recall = (double) tp / (double) (tp + fn);
+ double precision = (double) tp / (double) (tp + fp);
+ LOG.info("----------------------------");
+ LOG.info("Precision Recall values for " + metricKey);
+ LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn);
+ LOG.info("----------------------------");
+
+ if (recall < 0.5) {
+ LOG.info("Increasing EMA sensitivity by 10%");
+ emaModel.updateModel(true, 10);
+ } else if (precision < 0.5) {
+ LOG.info("Decreasing EMA sensitivity by 10%");
+ emaModel.updateModel(false, 10);
+ }
+
+ }
+
+ if (emaTechnique.getTrackedEmas().isEmpty()){
+ LOG.info("No EMA Technique keys tracked!!!!");
+ }
+
+ if (!timelineMetrics.getMetrics().isEmpty()) {
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+ }
+ }
+
+ private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) {
+
+ List<TimelineMetric> timelineMetrics = new ArrayList<>();
+
+ if (result == null) {
+ LOG.info("ResultSet from R call is null!!");
+ return null;
+ }
+
+ if (result.resultset.size() > 0) {
+ double[] ts = result.resultset.get(0);
+ double[] metrics = result.resultset.get(1);
+ double[] anomalyScore = result.resultset.get(2);
+ for (int i = 0; i < ts.length; i++) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricName + "_" + appId + "_" + hostname);
+ timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+ timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setStartTime((long) ts[i]);
+ TreeMap<Long, Double> metricValues = new TreeMap<>();
+ metricValues.put((long) ts[i], metrics[i]);
+
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", "tukeys");
+ metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+ timelineMetric.setMetadata(metadata);
+
+ timelineMetric.setMetricValues(metricValues);
+ timelineMetrics.add(timelineMetric);
+ }
+ }
+
+ return timelineMetrics;
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java
new file mode 100644
index 0000000..4fdf27d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RFunctionInvoker {
+
+ static final Log LOG = LogFactory.getLog(RFunctionInvoker.class);
+ public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+ private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts";
+
+ private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) {
+ r.assign("train_ts", trainData.ts);
+ r.assign("train_x", trainData.values);
+ r.eval("train_data <- data.frame(train_ts,train_x)");
+ r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")");
+
+ r.assign("test_ts", testData.ts);
+ r.assign("test_x", testData.values);
+ r.eval("test_data <- data.frame(test_ts,test_x)");
+ r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")");
+ }
+
+ public static void setScriptsDir(String dir) {
+ rScriptDir = dir;
+ }
+
+ public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+
+ ResultSet result;
+ switch (methodType) {
+ case "tukeys":
+ result = tukeys(trainData, testData, configs);
+ break;
+ case "ema":
+ result = ema_global(trainData, testData, configs);
+ break;
+ case "ks":
+ result = ksTest(trainData, testData, configs);
+ break;
+ case "hsdev":
+ result = hsdev(trainData, testData, configs);
+ break;
+ default:
+ result = tukeys(trainData, testData, configs);
+ break;
+ }
+ return result;
+ }
+
+ public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+
+ REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')");
+
+ double n = Double.parseDouble(configs.get("tukeys.n"));
+ r.eval("n <- " + n);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ams_tukeys(train_data, test_data, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+ int n = Integer.parseInt(configs.get("ema.n"));
+ r.eval("n <- " + n);
+
+ double w = Double.parseDouble(configs.get("ema.w"));
+ r.eval("w <- " + w);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ema_global(train_data, test_data, w, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+ int n = Integer.parseInt(configs.get("ema.n"));
+ r.eval("n <- " + n);
+
+ double w = Double.parseDouble(configs.get("ema.w"));
+ r.eval("w <- " + w);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ema_daily(train_data, test_data, w, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/kstest.r" + "')");
+
+ double p_value = Double.parseDouble(configs.get("ks.p_value"));
+ r.eval("p_value <- " + p_value);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ams_ks(train_data, test_data, p_value)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/hsdev.r" + "')");
+
+ int n = Integer.parseInt(configs.get("hsdev.n"));
+ r.eval("n <- " + n);
+
+ int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
+ r.eval("nhp <- " + nhp);
+
+ long interval = Long.parseLong(configs.get("hsdev.interval"));
+ r.eval("interval <- " + interval);
+
+ long period = Long.parseLong(configs.get("hsdev.period"));
+ r.eval("period <- " + period);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
+ REXP exp = r.eval("an2");
+ RVector cont = (RVector) exp.getContent();
+
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java
new file mode 100644
index 0000000..7485f01
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collections;
+import java.util.Map;
+
+@XmlRootElement
+public class TestSeriesInputRequest {
+
+ private String seriesName;
+ private String seriesType;
+ private Map<String, String> configs;
+
+ public TestSeriesInputRequest() {
+ }
+
+ public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) {
+ this.seriesName = seriesName;
+ this.seriesType = seriesType;
+ this.configs = configs;
+ }
+
+ public String getSeriesName() {
+ return seriesName;
+ }
+
+ public void setSeriesName(String seriesName) {
+ this.seriesName = seriesName;
+ }
+
+ public String getSeriesType() {
+ return seriesType;
+ }
+
+ public void setSeriesType(String seriesType) {
+ this.seriesType = seriesType;
+ }
+
+ public Map<String, String> getConfigs() {
+ return configs;
+ }
+
+ public void setConfigs(Map<String, String> configs) {
+ this.configs = configs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o;
+ return anotherInput.getSeriesName().equals(this.getSeriesName());
+ }
+
+ @Override
+ public int hashCode() {
+ return seriesName.hashCode();
+ }
+
+ public static void main(String[] args) {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value"));
+ try {
+ System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest));
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
new file mode 100644
index 0000000..1534b55
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.hsdev.HsdevTechnique;
+import org.apache.ambari.metrics.alertservice.prototype.methods.kstest.KSTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TrendADSystem implements Serializable {
+
+ private MetricsCollectorInterface metricsCollectorInterface;
+ private List<TrendMetric> trendMetrics;
+
+ private long ksTestIntervalMillis = 10 * 60 * 1000;
+ private long ksTrainIntervalMillis = 10 * 60 * 1000;
+ private KSTechnique ksTechnique;
+
+ private HsdevTechnique hsdevTechnique;
+ private int hsdevNumHistoricalPeriods = 3;
+
+ private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>();
+ private static final Log LOG = LogFactory.getLog(TrendADSystem.class);
+ private String inputFile = "";
+
+ public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
+ long ksTestIntervalMillis,
+ long ksTrainIntervalMillis,
+ int hsdevNumHistoricalPeriods,
+ String inputFileName) {
+
+ this.metricsCollectorInterface = metricsCollectorInterface;
+ this.ksTestIntervalMillis = ksTestIntervalMillis;
+ this.ksTrainIntervalMillis = ksTrainIntervalMillis;
+ this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods;
+
+ this.ksTechnique = new KSTechnique();
+ this.hsdevTechnique = new HsdevTechnique();
+
+ trendMetrics = new ArrayList<>();
+ this.inputFile = inputFileName;
+ readInputFile(inputFileName);
+ }
+
+ public void runKSTest(long currentEndTime) {
+ readInputFile(inputFile);
+
+ long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
+ LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " +
+ new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis)
+ + " : " + new Date(ksTestIntervalStartTime) + "]");
+
+ for (TrendMetric metric : trendMetrics) {
+ String metricName = metric.metricName;
+ String appId = metric.appId;
+ String hostname = metric.hostname;
+ String key = metricName + "_" + appId + "_" + hostname;
+
+ TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis,
+ currentEndTime);
+
+ if (ksData.getMetrics().isEmpty()) {
+ LOG.info("No metrics fetched for KS, metricKey = " + key);
+ continue;
+ }
+
+ List<Double> trainTsList = new ArrayList<>();
+ List<Double> trainDataList = new ArrayList<>();
+ List<Double> testTsList = new ArrayList<>();
+ List<Double> testDataList = new ArrayList<>();
+
+ for (TimelineMetric timelineMetric : ksData.getMetrics()) {
+ for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+ if (timestamp <= ksTestIntervalStartTime) {
+ trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ trainTsList.add((double) timestamp);
+ } else {
+ testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ testTsList.add((double) timestamp);
+ }
+ }
+ }
+
+ if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+ LOG.info("Not enough train/test data to perform KS analysis.");
+ continue;
+ }
+
+ String ksTrainSeries = "KSTrainSeries";
+ double[] trainTs = new double[trainTsList.size()];
+ double[] trainData = new double[trainTsList.size()];
+ for (int i = 0; i < trainTs.length; i++) {
+ trainTs[i] = trainTsList.get(i);
+ trainData[i] = trainDataList.get(i);
+ }
+
+ String ksTestSeries = "KSTestSeries";
+ double[] testTs = new double[testTsList.size()];
+ double[] testData = new double[testTsList.size()];
+ for (int i = 0; i < testTs.length; i++) {
+ testTs[i] = testTsList.get(i);
+ testData[i] = testDataList.get(i);
+ }
+
+ LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+ DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData);
+ DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData);
+
+ MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData);
+ if (metricAnomaly == null) {
+ LOG.info("No anomaly from KS test.");
+ } else {
+ LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric....");
+ TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly,
+ ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+
+ trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly);
+ }
+ }
+
+ if (trendMetrics.isEmpty()) {
+ LOG.info("No Trend metrics tracked!!!!");
+ }
+
+ }
+
+ private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly,
+ long testStart,
+ long testEnd,
+ long trainStart,
+ long trainEnd) {
+
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricAnomaly.getMetricKey());
+ timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType());
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+ timelineMetric.setStartTime(testEnd);
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", metricAnomaly.getMethodType());
+ metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore()));
+ metadata.put("test-start-time", String.valueOf(testStart));
+ metadata.put("train-start-time", String.valueOf(trainStart));
+ metadata.put("train-end-time", String.valueOf(trainEnd));
+ timelineMetric.setMetadata(metadata);
+ TreeMap<Long,Double> metricValues = new TreeMap<>();
+ metricValues.put(testEnd, metricAnomaly.getMetricValue());
+ timelineMetric.setMetricValues(metricValues);
+ return timelineMetric;
+
+ }
+ public void runHsdevMethod() {
+
+ List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
+
+ for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) {
+
+ long hsdevTestEnd = ksSingleRunKey.endTime;
+ long hsdevTestStart = ksSingleRunKey.startTime;
+
+ long period = hsdevTestEnd - hsdevTestStart;
+
+ long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period;
+ long hsdevTrainEnd = hsdevTestStart;
+
+ LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " +
+ new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart)
+ + " : " + new Date(hsdevTrainEnd) + "]");
+
+ String metricName = ksSingleRunKey.metricName;
+ String appId = ksSingleRunKey.appId;
+ String hostname = ksSingleRunKey.hostname;
+ String key = metricName + "_" + appId + "_" + hostname;
+
+ TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics(
+ metricName,
+ appId,
+ hostname,
+ hsdevTrainStart,
+ hsdevTestEnd);
+
+ if (hsdevData.getMetrics().isEmpty()) {
+ LOG.info("No metrics fetched for HSDev, metricKey = " + key);
+ continue;
+ }
+
+ List<Double> trainTsList = new ArrayList<>();
+ List<Double> trainDataList = new ArrayList<>();
+ List<Double> testTsList = new ArrayList<>();
+ List<Double> testDataList = new ArrayList<>();
+
+ for (TimelineMetric timelineMetric : hsdevData.getMetrics()) {
+ for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+ if (timestamp <= hsdevTestStart) {
+ trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ trainTsList.add((double) timestamp);
+ } else {
+ testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ testTsList.add((double) timestamp);
+ }
+ }
+ }
+
+ if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+ LOG.info("Not enough train/test data to perform Hsdev analysis.");
+ continue;
+ }
+
+ String hsdevTrainSeries = "HsdevTrainSeries";
+ double[] trainTs = new double[trainTsList.size()];
+ double[] trainData = new double[trainTsList.size()];
+ for (int i = 0; i < trainTs.length; i++) {
+ trainTs[i] = trainTsList.get(i);
+ trainData[i] = trainDataList.get(i);
+ }
+
+ String hsdevTestSeries = "HsdevTestSeries";
+ double[] testTs = new double[testTsList.size()];
+ double[] testData = new double[testTsList.size()];
+ for (int i = 0; i < testTs.length; i++) {
+ testTs[i] = testTsList.get(i);
+ testData[i] = testDataList.get(i);
+ }
+
+ LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+ DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData);
+ DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData);
+
+ MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData);
+ if (metricAnomaly == null) {
+ LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. ");
+ ksTechnique.updateModel(key, false, 10);
+ } else {
+ LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly.");
+ hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly,
+ hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd));
+ }
+ }
+ clearTrackedKsRunKeys();
+
+ if (!hsdevMetricAnomalies.isEmpty()) {
+ LOG.info("Publishing Hsdev Anomalies....");
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(hsdevMetricAnomalies);
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+ }
+ }
+
+ private void clearTrackedKsRunKeys() {
+ trackedKsAnomalies.clear();
+ }
+
+ private void readInputFile(String fileName) {
+ trendMetrics.clear();
+ try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
+ for (String line; (line = br.readLine()) != null; ) {
+ String[] splits = line.split(",");
+ LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
+ trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
+ }
+ } catch (IOException e) {
+ LOG.error("Error reading input file : " + e);
+ }
+ }
+
+ class KsSingleRunKey implements Serializable{
+
+ long startTime;
+ long endTime;
+ String metricName;
+ String appId;
+ String hostname;
+
+ public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.metricName = metricName;
+ this.appId = appId;
+ this.hostname = hostname;
+ }
+ }
+
+ /*
+ boolean isPresent = false;
+ for (TrendMetric trendMetric : trendMetrics) {
+ if (trendMetric.metricName.equalsIgnoreCase(splits[0])) {
+ isPresent = true;
+ }
+ }
+ if (!isPresent) {
+ LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
+ trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
+ }
+ */
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java
similarity index 68%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java
index af33d26..3bead8b 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java
@@ -15,15 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ambari.metrics.alertservice.methods;
+package org.apache.ambari.metrics.alertservice.prototype;
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import java.io.Serializable;
-import java.util.List;
+public class TrendMetric implements Serializable {
-public interface MetricAnomalyModel {
+ String metricName;
+ String appId;
+ String hostname;
- public List<MetricAnomaly> onNewMetric(TimelineMetric metric);
- public List<MetricAnomaly> test(TimelineMetric metric);
+ public TrendMetric(String metricName, String appId, String hostname) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.hostname = hostname;
+ }
}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
similarity index 77%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
index a709c73..eb19857 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
@@ -15,24 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ambari.metrics.alertservice.common;
+package org.apache.ambari.metrics.alertservice.prototype.common;
import java.util.Arrays;
-public class DataSet {
+public class DataSeries {
- public String metricName;
+ public String seriesName;
public double[] ts;
public double[] values;
- public DataSet(String metricName, double[] ts, double[] values) {
- this.metricName = metricName;
+ public DataSeries(String seriesName, double[] ts, double[] values) {
+ this.seriesName = seriesName;
this.ts = ts;
this.values = values;
}
@Override
public String toString() {
- return metricName + Arrays.toString(ts) + Arrays.toString(values);
+ return seriesName + Arrays.toString(ts) + Arrays.toString(values);
}
}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
similarity index 91%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
index 9415c1b..101b0e9 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ambari.metrics.alertservice.common;
+package org.apache.ambari.metrics.alertservice.prototype.common;
import java.util.ArrayList;
@@ -23,7 +23,7 @@ import java.util.List;
public class ResultSet {
- List<double[]> resultset = new ArrayList<>();
+ public List<double[]> resultset = new ArrayList<>();
public ResultSet(List<double[]> resultset) {
this.resultset = resultset;
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
similarity index 55%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
index 81bd77b..4ea4ac5 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
@@ -15,24 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ambari.metrics.alertservice.common;
+package org.apache.ambari.metrics.alertservice.prototype.common;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
public class StatisticUtils {
- public static double mean(Collection<Double> values) {
+ public static double mean(double[] values) {
double sum = 0;
for (double d : values) {
sum += d;
}
- return sum / values.size();
+ return sum / values.length;
}
- public static double variance(Collection<Double> values) {
+ public static double variance(double[] values) {
double avg = mean(values);
double variance = 0;
for (double d : values) {
@@ -41,37 +42,21 @@ public class StatisticUtils {
return variance;
}
- public static double sdev(Collection<Double> values, boolean useBesselsCorrection) {
+ public static double sdev(double[] values, boolean useBesselsCorrection) {
double variance = variance(values);
- int n = (useBesselsCorrection) ? values.size() - 1 : values.size();
+ int n = (useBesselsCorrection) ? values.length - 1 : values.length;
return Math.sqrt(variance / n);
}
- public static double median(Collection<Double> values) {
- ArrayList<Double> clonedValues = new ArrayList<Double>(values);
- Collections.sort(clonedValues);
- int n = values.size();
+ public static double median(double[] values) {
+ double[] clonedValues = Arrays.copyOf(values, values.length);
+ Arrays.sort(clonedValues);
+ int n = values.length;
if (n % 2 != 0) {
- return clonedValues.get((n-1)/2);
+ return clonedValues[(n-1)/2];
} else {
- return ( clonedValues.get((n-1)/2) + clonedValues.get(n/2) ) / 2;
+ return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2;
}
}
-
-
-
-// public static void main(String[] args) {
-//
-// Collection<Double> values = new ArrayList<>();
-// values.add(1.0);
-// values.add(2.0);
-// values.add(3.0);
-// values.add(4.0);
-// values.add(5.0);
-//
-// System.out.println(mean(values));
-// System.out.println(sdev(values, false));
-// System.out.println(median(values));
-// }
}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
similarity index 62%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
index 2d24a9c..0b10b4b 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
@@ -6,31 +6,27 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ambari.metrics.alertservice.methods.ema;
+package org.apache.ambari.metrics.alertservice.prototype.methods;
-import org.apache.ambari.metrics.alertservice.common.MethodResult;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-public class EmaResult extends MethodResult{
+import java.sql.Time;
+import java.util.List;
+import java.util.Map;
- double diff;
+public abstract class AnomalyDetectionTechnique {
- public EmaResult(double diff) {
- this.methodType = "EMA";
- this.diff = diff;
- }
+ protected String methodType;
+ public abstract List<MetricAnomaly> test(TimelineMetric metric);
- @Override
- public String prettyPrint() {
- return methodType + "(` = " + diff + ")";
- }
}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
new file mode 100644
index 0000000..da4f030
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricAnomaly implements Serializable{
+
+ private String methodType;
+ private double anomalyScore;
+ private String metricKey;
+ private long timestamp;
+ private double metricValue;
+
+
+ public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) {
+ this.metricKey = metricKey;
+ this.timestamp = timestamp;
+ this.metricValue = metricValue;
+ this.methodType = methodType;
+ this.anomalyScore = anomalyScore;
+
+ }
+
+ public String getMethodType() {
+ return methodType;
+ }
+
+ public void setMethodType(String methodType) {
+ this.methodType = methodType;
+ }
+
+ public double getAnomalyScore() {
+ return anomalyScore;
+ }
+
+ public void setAnomalyScore(double anomalyScore) {
+ this.anomalyScore = anomalyScore;
+ }
+
+ public void setMetricKey(String metricKey) {
+ this.metricKey = metricKey;
+ }
+
+ public String getMetricKey() {
+ return metricKey;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricKey = metricName;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public double getMetricValue() {
+ return metricValue;
+ }
+
+ public void setMetricValue(double metricValue) {
+ this.metricValue = metricValue;
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
new file mode 100644
index 0000000..5e1f76b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+@XmlRootElement
+public class EmaModel implements Serializable {
+
+ private String metricName;
+ private String hostname;
+ private String appId;
+ private double ema;
+ private double ems;
+ private double weight;
+ private double timessdev;
+
+ private int ctr = 0;
+ private static final int suppressAnomaliesTheshold = 30;
+
+ private static final Log LOG = LogFactory.getLog(EmaModel.class);
+
+ public EmaModel(String name, String hostname, String appId, double weight, double timessdev) {
+ this.metricName = name;
+ this.hostname = hostname;
+ this.appId = appId;
+ this.weight = weight;
+ this.timessdev = timessdev;
+ this.ema = 0.0;
+ this.ems = 0.0;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public double testAndUpdate(double metricValue) {
+
+ double anomalyScore = 0.0;
+ if (ctr > suppressAnomaliesTheshold) {
+ anomalyScore = test(metricValue);
+ }
+ if (Math.abs(anomalyScore) < 2 * timessdev) {
+ update(metricValue);
+ } else {
+ LOG.info("Not updating model for this value");
+ }
+ ctr++;
+ LOG.info("Counter : " + ctr);
+ LOG.info("Anomaly Score for " + metricValue + " : " + anomalyScore);
+ return anomalyScore;
+ }
+
+ public void update(double metricValue) {
+ ema = weight * ema + (1 - weight) * metricValue;
+ ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
+ LOG.info("In update : ema = " + ema + ", ems = " + ems);
+ }
+
+ public double test(double metricValue) {
+ LOG.info("In test : ema = " + ema + ", ems = " + ems);
+ double diff = Math.abs(ema - metricValue) - (timessdev * ems);
+ LOG.info("diff = " + diff);
+ if (diff > 0) {
+ return Math.abs((metricValue - ema) / ems); //Z score
+ } else {
+ return 0.0;
+ }
+ }
+
+ public void updateModel(boolean increaseSensitivity, double percent) {
+ LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent);
+ double delta = percent / 100;
+ if (increaseSensitivity) {
+ delta = delta * -1;
+ }
+ this.timessdev = timessdev + delta * timessdev;
+ this.weight = Math.min(1.0, weight + delta * weight);
+ LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight);
+ }
+
+ public double getWeight() {
+ return weight;
+ }
+
+ public void setWeight(double weight) {
+ this.weight = weight;
+ }
+
+ public double getTimessdev() {
+ return timessdev;
+ }
+
+ public void setTimessdev(double timessdev) {
+ this.timessdev = timessdev;
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
similarity index 64%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
index 0205844..62749c1 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
@@ -15,32 +15,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ambari.metrics.alertservice.methods.ema;
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
import com.google.gson.Gson;
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkContext;
import org.apache.spark.mllib.util.Loader;
-import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
-public class EmaModelLoader implements Loader<EmaModel> {
+public class EmaModelLoader implements Loader<EmaTechnique> {
private static final Log LOG = LogFactory.getLog(EmaModelLoader.class);
@Override
- public EmaModel load(SparkContext sc, String path) {
- Gson gson = new Gson();
- try {
- String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
- return gson.fromJson(fileString, EmaModel.class);
- } catch (IOException e) {
- LOG.error(e);
- }
- return null;
+ public EmaTechnique load(SparkContext sc, String path) {
+ return new EmaTechnique(0.5,3);
+// Gson gson = new Gson();
+// try {
+// String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+// return gson.fromJson(fileString, EmaTechnique.class);
+// } catch (IOException e) {
+// LOG.error(e);
+// }
+// return null;
}
}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
new file mode 100644
index 0000000..c005e6f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.AnomalyDetectionTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Saveable;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable {
+
+ @XmlElement(name = "trackedEmas")
+ private Map<String, EmaModel> trackedEmas;
+ private static final Log LOG = LogFactory.getLog(EmaTechnique.class);
+
+ private double startingWeight = 0.5;
+ private double startTimesSdev = 3.0;
+ private String methodType = "ema";
+
+ public EmaTechnique(double startingWeight, double startTimesSdev) {
+ trackedEmas = new HashMap<>();
+ this.startingWeight = startingWeight;
+ this.startTimesSdev = startTimesSdev;
+ LOG.info("New EmaTechnique......");
+ }
+
+ public List<MetricAnomaly> test(TimelineMetric metric) {
+ String metricName = metric.getMetricName();
+ String appId = metric.getAppId();
+ String hostname = metric.getHostName();
+ String key = metricName + "_" + appId + "_" + hostname;
+
+ EmaModel emaModel = trackedEmas.get(key);
+ if (emaModel == null) {
+ LOG.info("EmaModel not present for " + key);
+ LOG.info("Number of tracked Emas : " + trackedEmas.size());
+ emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev);
+ trackedEmas.put(key, emaModel);
+ } else {
+ LOG.info("EmaModel already present for " + key);
+ }
+
+ List<MetricAnomaly> anomalies = new ArrayList<>();
+
+ for (Long timestamp : metric.getMetricValues().keySet()) {
+ double metricValue = metric.getMetricValues().get(timestamp);
+ double anomalyScore = emaModel.testAndUpdate(metricValue);
+ if (anomalyScore > 0.0) {
+ LOG.info("Found anomaly for : " + key);
+ MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore);
+ anomalies.add(metricAnomaly);
+ } else {
+ LOG.info("Discarding non-anomaly for : " + key);
+ }
+ }
+ return anomalies;
+ }
+
+ public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) {
+ String metricName = timelineMetric.getMetricName();
+ String appId = timelineMetric.getAppId();
+ String hostname = timelineMetric.getHostName();
+ String key = metricName + "_" + appId + "_" + hostname;
+
+
+ EmaModel emaModel = trackedEmas.get(key);
+
+ if (emaModel == null) {
+ LOG.warn("EMA Model for " + key + " not found");
+ return false;
+ }
+ emaModel.updateModel(increaseSensitivity, percent);
+
+ return true;
+ }
+
+ @Override
+ public void save(SparkContext sc, String path) {
+ Gson gson = new Gson();
+ try {
+ String json = gson.toJson(this);
+ try (Writer writer = new BufferedWriter(new OutputStreamWriter(
+ new FileOutputStream(path), "utf-8"))) {
+ writer.write(json);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+
+ @Override
+ public String formatVersion() {
+ return "1.0";
+ }
+
+ public Map<String, EmaModel> getTrackedEmas() {
+ return trackedEmas;
+ }
+
+ public double getStartingWeight() {
+ return startingWeight;
+ }
+
+ public double getStartTimesSdev() {
+ return startTimesSdev;
+ }
+
+}
+
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
new file mode 100644
index 0000000..50bf9f2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.hsdev;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.median;
+import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.sdev;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HsdevTechnique implements Serializable {
+
+ private Map<String, Double> hsdevMap;
+ private String methodType = "hsdev";
+ private static final Log LOG = LogFactory.getLog(HsdevTechnique.class);
+
+ public HsdevTechnique() {
+ hsdevMap = new HashMap<>();
+ }
+
+ public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) {
+ int testLength = testData.values.length;
+ int trainLength = trainData.values.length;
+
+ if (trainLength < testLength) {
+ LOG.info("Not enough train data.");
+ return null;
+ }
+
+ if (!hsdevMap.containsKey(key)) {
+ hsdevMap.put(key, 3.0);
+ }
+
+ double n = hsdevMap.get(key);
+
+ double historicSd = sdev(trainData.values, false);
+ double historicMedian = median(trainData.values);
+ double currentMedian = median(testData.values);
+
+ double diff = Math.abs(currentMedian - historicMedian);
+ LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
+ LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
+
+ if (diff > n * historicSd) {
+ double zScore = diff / historicSd;
+ LOG.info("Z Score of current series : " + zScore);
+ return new MetricAnomaly(key,
+ (long) testData.ts[testLength - 1],
+ testData.values[testLength - 1],
+ methodType,
+ zScore);
+ }
+ return null;
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
new file mode 100644
index 0000000..ff8dbcf
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype.methods.kstest;
+
+import org.apache.ambari.metrics.alertservice.prototype.RFunctionInvoker;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KSTechnique implements Serializable {
+
+ private String methodType = "ks";
+ private Map<String, Double> pValueMap;
+ private static final Log LOG = LogFactory.getLog(KSTechnique.class);
+
+ public KSTechnique() {
+ pValueMap = new HashMap();
+ }
+
+ public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries testData) {
+
+ int testLength = testData.values.length;
+ int trainLength = trainData.values.length;
+
+ if (trainLength < testLength) {
+ LOG.info("Not enough train data.");
+ return null;
+ }
+
+ if (!pValueMap.containsKey(key)) {
+ pValueMap.put(key, 0.05);
+ }
+ double pValue = pValueMap.get(key);
+
+ ResultSet result = RFunctionInvoker.ksTest(trainData, testData, Collections.singletonMap("ks.p_value", String.valueOf(pValue)));
+ if (result == null) {
+ LOG.error("Resultset is null when invoking KS R function...");
+ return null;
+ }
+
+ if (result.resultset.size() > 0) {
+
+ LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length);
+ LOG.info("p_value = " + result.resultset.get(3)[0]);
+ double dValue = result.resultset.get(2)[0];
+
+ return new MetricAnomaly(key,
+ (long) testData.ts[testLength - 1],
+ testData.values[testLength - 1],
+ methodType,
+ dValue);
+ }
+
+ return null;
+ }
+
+ public void updateModel(String metricKey, boolean increaseSensitivity, double percent) {
+
+ LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent);
+
+ if (!pValueMap.containsKey(metricKey)) {
+ LOG.error("Unknown metric key : " + metricKey);
+ LOG.info("pValueMap :" + pValueMap.toString());
+ return;
+ }
+
+ double delta = percent / 100;
+ if (!increaseSensitivity) {
+ delta = delta * -1;
+ }
+
+ double pValue = pValueMap.get(metricKey);
+ double newPValue = Math.min(1.0, pValue + delta * pValue);
+ pValueMap.put(metricKey, newPValue);
+ LOG.info("New pValue = " + newPValue);
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
similarity index 77%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
index 6bf58df..a8e31bf 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
@@ -15,13 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ambari.metrics.alertservice.common;
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
-public abstract class MethodResult {
- protected String methodType;
- public abstract String prettyPrint();
+public interface AbstractMetricSeries {
+
+ public double nextValue();
+ public double[] getSeries(int n);
- public String getMethodType() {
- return methodType;
- }
}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
new file mode 100644
index 0000000..4158ff4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class DualBandMetricSeries implements AbstractMetricSeries {
+
+ double lowBandValue = 0.0;
+ double lowBandDeviationPercentage = 0.0;
+ int lowBandPeriodSize = 10;
+ double highBandValue = 1.0;
+ double highBandDeviationPercentage = 0.0;
+ int highBandPeriodSize = 10;
+
+ Random random = new Random();
+ double lowBandValueLowerLimit, lowBandValueHigherLimit;
+ double highBandLowerLimit, highBandUpperLimit;
+ int l = 0, h = 0;
+
+ public DualBandMetricSeries(double lowBandValue,
+ double lowBandDeviationPercentage,
+ int lowBandPeriodSize,
+ double highBandValue,
+ double highBandDeviationPercentage,
+ int highBandPeriodSize) {
+ this.lowBandValue = lowBandValue;
+ this.lowBandDeviationPercentage = lowBandDeviationPercentage;
+ this.lowBandPeriodSize = lowBandPeriodSize;
+ this.highBandValue = highBandValue;
+ this.highBandDeviationPercentage = highBandDeviationPercentage;
+ this.highBandPeriodSize = highBandPeriodSize;
+ init();
+ }
+
+ private void init() {
+ lowBandValueLowerLimit = lowBandValue - lowBandDeviationPercentage * lowBandValue;
+ lowBandValueHigherLimit = lowBandValue + lowBandDeviationPercentage * lowBandValue;
+ highBandLowerLimit = highBandValue - highBandDeviationPercentage * highBandValue;
+ highBandUpperLimit = highBandValue + highBandDeviationPercentage * highBandValue;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value = 0.0;
+
+ if (l < lowBandPeriodSize) {
+ value = lowBandValueLowerLimit + (lowBandValueHigherLimit - lowBandValueLowerLimit) * random.nextDouble();
+ l++;
+ } else if (h < highBandPeriodSize) {
+ value = highBandLowerLimit + (highBandUpperLimit - highBandLowerLimit) * random.nextDouble();
+ h++;
+ }
+
+ if (l == lowBandPeriodSize && h == highBandPeriodSize) {
+ l = 0;
+ h = 0;
+ }
+
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
new file mode 100644
index 0000000..1e37ff3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+public class MetricSeriesGeneratorFactory {
+
+ /**
+ * Return a normally distributed data series with some deviation % and outliers.
+ *
+ * @param n size of the data series
+ * @param value The value around which the uniform data series is centered on.
+ * @param deviationPercentage The allowed deviation % on either side of the uniform value. For example, if value = 10, and deviation % is 0.1, the series values lie between 0.9 to 1.1.
+ * @param outlierProbability The probability of finding an outlier in the series.
+ * @param outlierDeviationLowerPercentage min percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13.
+ * @param outlierDeviationHigherPercentage max percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16.
+ * @param outliersAboveValue Outlier should be greater or smaller than the value.
+ * @return uniform series
+ */
+ public static double[] createUniformSeries(int n,
+ double value,
+ double deviationPercentage,
+ double outlierProbability,
+ double outlierDeviationLowerPercentage,
+ double outlierDeviationHigherPercentage,
+ boolean outliersAboveValue) {
+
+ UniformMetricSeries metricSeries = new UniformMetricSeries(value,
+ deviationPercentage,
+ outlierProbability,
+ outlierDeviationLowerPercentage,
+ outlierDeviationHigherPercentage,
+ outliersAboveValue);
+
+ return metricSeries.getSeries(n);
+ }
+
+
+ /**
+ * /**
+ * Returns a normally distributed series.
+ *
+ * @param n size of the data series
+ * @param mean mean of the distribution
+ * @param sd sd of the distribution
+ * @param outlierProbability sd of the distribution
+ * @param outlierDeviationSDTimesLower Lower Limit of the outlier with respect to times sdev from the mean.
+ * @param outlierDeviationSDTimesHigher Higher Limit of the outlier with respect to times sdev from the mean.
+ * @param outlierOnRightEnd Outlier should be on the right end or the left end.
+ * @return normal series
+ */
+ public static double[] createNormalSeries(int n,
+ double mean,
+ double sd,
+ double outlierProbability,
+ double outlierDeviationSDTimesLower,
+ double outlierDeviationSDTimesHigher,
+ boolean outlierOnRightEnd) {
+
+
+ NormalMetricSeries metricSeries = new NormalMetricSeries(mean,
+ sd,
+ outlierProbability,
+ outlierDeviationSDTimesLower,
+ outlierDeviationSDTimesHigher,
+ outlierOnRightEnd);
+
+ return metricSeries.getSeries(n);
+ }
+
+
+ /**
+ * Returns a monotonically increasing / decreasing series
+ *
+ * @param n size of the data series
+ * @param startValue Start value of the monotonic sequence
+ * @param slope direction of monotonicity m > 0 for increasing and m < 0 for decreasing.
+ * @param deviationPercentage The allowed deviation % on either side of the current 'y' value. For example, if current value = 10 according to slope, and deviation % is 0.1, the series values lie between 0.9 to 1.1.
+ * @param outlierProbability The probability of finding an outlier in the series.
+ * @param outlierDeviationLowerPercentage min percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13.
+ * @param outlierDeviationHigherPercentage max percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16.
+ * @param outliersAboveValue Outlier should be greater or smaller than the 'y' value.
+ * @return
+ */
+ public static double[] createMonotonicSeries(int n,
+ double startValue,
+ double slope,
+ double deviationPercentage,
+ double outlierProbability,
+ double outlierDeviationLowerPercentage,
+ double outlierDeviationHigherPercentage,
+ boolean outliersAboveValue) {
+
+ MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(startValue,
+ slope,
+ deviationPercentage,
+ outlierProbability,
+ outlierDeviationLowerPercentage,
+ outlierDeviationHigherPercentage,
+ outliersAboveValue);
+
+ return metricSeries.getSeries(n);
+ }
+
+
+ /**
+ * Returns a dual band series (lower and higher)
+ *
+ * @param n size of the data series
+ * @param lowBandValue lower band value
+ * @param lowBandDeviationPercentage lower band deviation
+ * @param lowBandPeriodSize lower band
+ * @param highBandValue high band centre value
+ * @param highBandDeviationPercentage high band deviation.
+ * @param highBandPeriodSize high band size
+ * @return
+ */
+ public static double[] getDualBandSeries(int n,
+ double lowBandValue,
+ double lowBandDeviationPercentage,
+ int lowBandPeriodSize,
+ double highBandValue,
+ double highBandDeviationPercentage,
+ int highBandPeriodSize) {
+
+ DualBandMetricSeries metricSeries = new DualBandMetricSeries(lowBandValue,
+ lowBandDeviationPercentage,
+ lowBandPeriodSize,
+ highBandValue,
+ highBandDeviationPercentage,
+ highBandPeriodSize);
+
+ return metricSeries.getSeries(n);
+ }
+
+ /**
+ * Returns a step function series.
+ *
+ * @param n size of the data series
+ * @param startValue start steady value
+ * @param steadyValueDeviationPercentage required devation in the steady state value
+ * @param steadyPeriodSlope direction of monotonicity m > 0 for increasing and m < 0 for decreasing, m = 0 no increase or decrease.
+ * @param steadyPeriodMinSize min size for step period
+ * @param steadyPeriodMaxSize max size for step period.
+ * @param stepChangePercentage Increase / decrease in steady state to denote a step in terms of deviation percentage from the last value.
+ * @param upwardStep upward or downward step.
+ * @return
+ */
+ public static double[] getStepFunctionSeries(int n,
+ double startValue,
+ double steadyValueDeviationPercentage,
+ double steadyPeriodSlope,
+ int steadyPeriodMinSize,
+ int steadyPeriodMaxSize,
+ double stepChangePercentage,
+ boolean upwardStep) {
+
+ StepFunctionMetricSeries metricSeries = new StepFunctionMetricSeries(startValue,
+ steadyValueDeviationPercentage,
+ steadyPeriodSlope,
+ steadyPeriodMinSize,
+ steadyPeriodMaxSize,
+ stepChangePercentage,
+ upwardStep);
+
+ return metricSeries.getSeries(n);
+ }
+
+ /**
+ * Series with small period of turbulence and then back to steady.
+ *
+ * @param n size of the data series
+ * @param steadyStateValue steady state center value
+ * @param steadyStateDeviationPercentage steady state deviation in percentage
+ * @param turbulentPeriodDeviationLowerPercentage turbulent state lower limit in terms of percentage from centre value.
+ * @param turbulentPeriodDeviationHigherPercentage turbulent state higher limit in terms of percentage from centre value.
+ * @param turbulentPeriodLength turbulent period length (number of points)
+ * @param turbulentStatePosition Where the turbulent state should be 0 - at the beginning, 1 - in the middle (25% - 50% of the series), 2 - at the end of the series.
+ * @return
+ */
+ public static double[] getSteadySeriesWithTurbulentPeriod(int n,
+ double steadyStateValue,
+ double steadyStateDeviationPercentage,
+ double turbulentPeriodDeviationLowerPercentage,
+ double turbulentPeriodDeviationHigherPercentage,
+ int turbulentPeriodLength,
+ int turbulentStatePosition
+ ) {
+
+
+ SteadyWithTurbulenceMetricSeries metricSeries = new SteadyWithTurbulenceMetricSeries(n,
+ steadyStateValue,
+ steadyStateDeviationPercentage,
+ turbulentPeriodDeviationLowerPercentage,
+ turbulentPeriodDeviationHigherPercentage,
+ turbulentPeriodLength,
+ turbulentStatePosition);
+
+ return metricSeries.getSeries(n);
+ }
+
+
+ public static double[] generateSeries(String type, int n, Map<String, String> configs) {
+
+ double[] series;
+ switch (type) {
+
+ case "normal":
+ series = createNormalSeries(n,
+ Double.parseDouble(configs.getOrDefault("mean", "0")),
+ Double.parseDouble(configs.getOrDefault("sd", "1")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true")));
+ break;
+
+ case "uniform":
+ series = createUniformSeries(n,
+ Double.parseDouble(configs.getOrDefault("value", "10")),
+ Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+ break;
+
+ case "monotonic":
+ series = createMonotonicSeries(n,
+ Double.parseDouble(configs.getOrDefault("startValue", "10")),
+ Double.parseDouble(configs.getOrDefault("slope", "0")),
+ Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+ break;
+
+ case "dualband":
+ series = getDualBandSeries(n,
+ Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+ Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+ Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+ Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+ Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+ Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+ break;
+
+ case "step":
+ series = getStepFunctionSeries(n,
+ Double.parseDouble(configs.getOrDefault("startValue", "10")),
+ Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+ Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+ Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+ Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+ break;
+
+ case "turbulence":
+ series = getSteadySeriesWithTurbulentPeriod(n,
+ Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+ Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")),
+ Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+ Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0")));
+ break;
+
+ default:
+ series = createNormalSeries(n,
+ 0,
+ 1,
+ 0,
+ 0,
+ 0,
+ true);
+ }
+ return series;
+ }
+
+ public static AbstractMetricSeries generateSeries(String type, Map<String, String> configs) {
+
+ AbstractMetricSeries series;
+ switch (type) {
+
+ case "normal":
+ series = new NormalMetricSeries(Double.parseDouble(configs.getOrDefault("mean", "0")),
+ Double.parseDouble(configs.getOrDefault("sd", "1")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true")));
+ break;
+
+ case "uniform":
+ series = new UniformMetricSeries(
+ Double.parseDouble(configs.getOrDefault("value", "10")),
+ Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+ break;
+
+ case "monotonic":
+ series = new MonotonicMetricSeries(
+ Double.parseDouble(configs.getOrDefault("startValue", "10")),
+ Double.parseDouble(configs.getOrDefault("slope", "0")),
+ Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+ break;
+
+ case "dualband":
+ series = new DualBandMetricSeries(
+ Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+ Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+ Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+ Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+ Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+ Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+ break;
+
+ case "step":
+ series = new StepFunctionMetricSeries(
+ Double.parseDouble(configs.getOrDefault("startValue", "10")),
+ Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+ Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+ Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+ Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+ break;
+
+ case "turbulence":
+ series = new SteadyWithTurbulenceMetricSeries(
+ Integer.parseInt(configs.getOrDefault("approxSeriesLength", "100")),
+ Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+ Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")),
+ Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+ Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0")));
+ break;
+
+ default:
+ series = new NormalMetricSeries(0,
+ 1,
+ 0,
+ 0,
+ 0,
+ true);
+ }
+ return series;
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
new file mode 100644
index 0000000..a883d08
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class MonotonicMetricSeries implements AbstractMetricSeries {
+
+ double startValue = 0.0;
+ double slope = 0.5;
+ double deviationPercentage = 0.0;
+ double outlierProbability = 0.0;
+ double outlierDeviationLowerPercentage = 0.0;
+ double outlierDeviationHigherPercentage = 0.0;
+ boolean outliersAboveValue = true;
+
+ Random random = new Random();
+ double nonOutlierProbability;
+
+ // y = mx + c
+ double y;
+ double m;
+ double x;
+ double c;
+
+ public MonotonicMetricSeries(double startValue,
+ double slope,
+ double deviationPercentage,
+ double outlierProbability,
+ double outlierDeviationLowerPercentage,
+ double outlierDeviationHigherPercentage,
+ boolean outliersAboveValue) {
+ this.startValue = startValue;
+ this.slope = slope;
+ this.deviationPercentage = deviationPercentage;
+ this.outlierProbability = outlierProbability;
+ this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+ this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+ this.outliersAboveValue = outliersAboveValue;
+ init();
+ }
+
+ private void init() {
+ y = startValue;
+ m = slope;
+ x = 1;
+ c = y - (m * x);
+ nonOutlierProbability = 1.0 - outlierProbability;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value;
+ double probability = random.nextDouble();
+
+ y = m * x + c;
+ if (probability <= nonOutlierProbability) {
+ double valueDeviationLowerLimit = y - deviationPercentage * y;
+ double valueDeviationHigherLimit = y + deviationPercentage * y;
+ value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+ } else {
+ if (outliersAboveValue) {
+ double outlierLowerLimit = y + outlierDeviationLowerPercentage * y;
+ double outlierUpperLimit = y + outlierDeviationHigherPercentage * y;
+ value = outlierLowerLimit + (outlierUpperLimit - outlierLowerLimit) * random.nextDouble();
+ } else {
+ double outlierLowerLimit = y - outlierDeviationLowerPercentage * y;
+ double outlierUpperLimit = y - outlierDeviationHigherPercentage * y;
+ value = outlierUpperLimit + (outlierLowerLimit - outlierUpperLimit) * random.nextDouble();
+ }
+ }
+ x++;
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
new file mode 100644
index 0000000..cc83d2c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class NormalMetricSeries implements AbstractMetricSeries {
+
+ double mean = 0.0;
+ double sd = 1.0;
+ double outlierProbability = 0.0;
+ double outlierDeviationSDTimesLower = 0.0;
+ double outlierDeviationSDTimesHigher = 0.0;
+ boolean outlierOnRightEnd = true;
+
+ Random random = new Random();
+ double nonOutlierProbability;
+
+
+ public NormalMetricSeries(double mean,
+ double sd,
+ double outlierProbability,
+ double outlierDeviationSDTimesLower,
+ double outlierDeviationSDTimesHigher,
+ boolean outlierOnRightEnd) {
+ this.mean = mean;
+ this.sd = sd;
+ this.outlierProbability = outlierProbability;
+ this.outlierDeviationSDTimesLower = outlierDeviationSDTimesLower;
+ this.outlierDeviationSDTimesHigher = outlierDeviationSDTimesHigher;
+ this.outlierOnRightEnd = outlierOnRightEnd;
+ init();
+ }
+
+ private void init() {
+ nonOutlierProbability = 1.0 - outlierProbability;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value;
+ double probability = random.nextDouble();
+
+ if (probability <= nonOutlierProbability) {
+ value = random.nextGaussian() * sd + mean;
+ } else {
+ if (outlierOnRightEnd) {
+ value = mean + (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd;
+ } else {
+ value = mean - (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd;
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
new file mode 100644
index 0000000..c4ed3ba
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class SteadyWithTurbulenceMetricSeries implements AbstractMetricSeries {
+
+ double steadyStateValue = 0.0;
+ double steadyStateDeviationPercentage = 0.0;
+ double turbulentPeriodDeviationLowerPercentage = 0.3;
+ double turbulentPeriodDeviationHigherPercentage = 0.5;
+ int turbulentPeriodLength = 5;
+ int turbulentStatePosition = 1;
+ int approximateSeriesLength = 10;
+
+ Random random = new Random();
+ double valueDeviationLowerLimit;
+ double valueDeviationHigherLimit;
+ double tPeriodLowerLimit;
+ double tPeriodUpperLimit;
+ int tPeriodStartIndex = 0;
+ int index = 0;
+
+ public SteadyWithTurbulenceMetricSeries(int approximateSeriesLength,
+ double steadyStateValue,
+ double steadyStateDeviationPercentage,
+ double turbulentPeriodDeviationLowerPercentage,
+ double turbulentPeriodDeviationHigherPercentage,
+ int turbulentPeriodLength,
+ int turbulentStatePosition) {
+ this.approximateSeriesLength = approximateSeriesLength;
+ this.steadyStateValue = steadyStateValue;
+ this.steadyStateDeviationPercentage = steadyStateDeviationPercentage;
+ this.turbulentPeriodDeviationLowerPercentage = turbulentPeriodDeviationLowerPercentage;
+ this.turbulentPeriodDeviationHigherPercentage = turbulentPeriodDeviationHigherPercentage;
+ this.turbulentPeriodLength = turbulentPeriodLength;
+ this.turbulentStatePosition = turbulentStatePosition;
+ init();
+ }
+
+ private void init() {
+
+ if (turbulentStatePosition == 1) {
+ tPeriodStartIndex = (int) (0.25 * approximateSeriesLength + (0.25 * approximateSeriesLength * random.nextDouble()));
+ } else if (turbulentStatePosition == 2) {
+ tPeriodStartIndex = approximateSeriesLength - turbulentPeriodLength;
+ }
+
+ valueDeviationLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue;
+ valueDeviationHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue;
+
+ tPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+ tPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value;
+
+ if (index >= tPeriodStartIndex && index <= (tPeriodStartIndex + turbulentPeriodLength)) {
+ value = tPeriodLowerLimit + (tPeriodUpperLimit - tPeriodLowerLimit) * random.nextDouble();
+ } else {
+ value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+ }
+ index++;
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+
+ double[] series = new double[n];
+ int turbulentPeriodStartIndex = 0;
+
+ if (turbulentStatePosition == 1) {
+ turbulentPeriodStartIndex = (int) (0.25 * n + (0.25 * n * random.nextDouble()));
+ } else if (turbulentStatePosition == 2) {
+ turbulentPeriodStartIndex = n - turbulentPeriodLength;
+ }
+
+ double valueDevLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue;
+ double valueDevHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue;
+
+ double turbulentPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+ double turbulentPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+
+ for (int i = 0; i < n; i++) {
+ if (i >= turbulentPeriodStartIndex && i < (turbulentPeriodStartIndex + turbulentPeriodLength)) {
+ series[i] = turbulentPeriodLowerLimit + (turbulentPeriodUpperLimit - turbulentPeriodLowerLimit) * random.nextDouble();
+ } else {
+ series[i] = valueDevLowerLimit + (valueDevHigherLimit - valueDevLowerLimit) * random.nextDouble();
+ }
+ }
+
+ return series;
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
new file mode 100644
index 0000000..d5beb48
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class StepFunctionMetricSeries implements AbstractMetricSeries {
+
+ double startValue = 0.0;
+ double steadyValueDeviationPercentage = 0.0;
+ double steadyPeriodSlope = 0.5;
+ int steadyPeriodMinSize = 10;
+ int steadyPeriodMaxSize = 20;
+ double stepChangePercentage = 0.0;
+ boolean upwardStep = true;
+
+ Random random = new Random();
+
+ // y = mx + c
+ double y;
+ double m;
+ double x;
+ double c;
+ int currentStepSize;
+ int currentIndex;
+
+ public StepFunctionMetricSeries(double startValue,
+ double steadyValueDeviationPercentage,
+ double steadyPeriodSlope,
+ int steadyPeriodMinSize,
+ int steadyPeriodMaxSize,
+ double stepChangePercentage,
+ boolean upwardStep) {
+ this.startValue = startValue;
+ this.steadyValueDeviationPercentage = steadyValueDeviationPercentage;
+ this.steadyPeriodSlope = steadyPeriodSlope;
+ this.steadyPeriodMinSize = steadyPeriodMinSize;
+ this.steadyPeriodMaxSize = steadyPeriodMaxSize;
+ this.stepChangePercentage = stepChangePercentage;
+ this.upwardStep = upwardStep;
+ init();
+ }
+
+ private void init() {
+ y = startValue;
+ m = steadyPeriodSlope;
+ x = 1;
+ c = y - (m * x);
+
+ currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble());
+ currentIndex = 0;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value = 0.0;
+
+ if (currentIndex < currentStepSize) {
+ y = m * x + c;
+ double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y;
+ double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * y;
+ value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+ x++;
+ currentIndex++;
+ }
+
+ if (currentIndex == currentStepSize) {
+ currentIndex = 0;
+ currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble());
+ if (upwardStep) {
+ y = y + stepChangePercentage * y;
+ } else {
+ y = y - stepChangePercentage * y;
+ }
+ x = 1;
+ c = y - (m * x);
+ }
+
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
new file mode 100644
index 0000000..a2b0eea
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class UniformMetricSeries implements AbstractMetricSeries {
+
+ double value = 0.0;
+ double deviationPercentage = 0.0;
+ double outlierProbability = 0.0;
+ double outlierDeviationLowerPercentage = 0.0;
+ double outlierDeviationHigherPercentage = 0.0;
+ boolean outliersAboveValue= true;
+
+ Random random = new Random();
+ double valueDeviationLowerLimit;
+ double valueDeviationHigherLimit;
+ double outlierLeftLowerLimit;
+ double outlierLeftHigherLimit;
+ double outlierRightLowerLimit;
+ double outlierRightUpperLimit;
+ double nonOutlierProbability;
+
+
+ public UniformMetricSeries(double value,
+ double deviationPercentage,
+ double outlierProbability,
+ double outlierDeviationLowerPercentage,
+ double outlierDeviationHigherPercentage,
+ boolean outliersAboveValue) {
+ this.value = value;
+ this.deviationPercentage = deviationPercentage;
+ this.outlierProbability = outlierProbability;
+ this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+ this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+ this.outliersAboveValue = outliersAboveValue;
+ init();
+ }
+
+ private void init() {
+ valueDeviationLowerLimit = value - deviationPercentage * value;
+ valueDeviationHigherLimit = value + deviationPercentage * value;
+
+ outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value;
+ outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value;
+ outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value;
+ outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value;
+
+ nonOutlierProbability = 1.0 - outlierProbability;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value;
+ double probability = random.nextDouble();
+
+ if (probability <= nonOutlierProbability) {
+ value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+ } else {
+ if (!outliersAboveValue) {
+ value = outlierLeftLowerLimit + (outlierLeftHigherLimit - outlierLeftLowerLimit) * random.nextDouble();
+ } else {
+ value = outlierRightLowerLimit + (outlierRightUpperLimit - outlierRightLowerLimit) * random.nextDouble();
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
deleted file mode 100644
index d65790e..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.spark;
-
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.*;
-
-public class AnomalyMetricPublisher implements Serializable {
-
- private String hostName = "UNKNOWN.example.com";
- private String instanceId = null;
- private String serviceName = "anomaly-engine";
- private String collectorHost;
- private String protocol;
- private String port;
- private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
- private static final Log LOG = LogFactory.getLog(AnomalyMetricPublisher.class);
- private static ObjectMapper mapper;
-
- static {
- mapper = new ObjectMapper();
- AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
- mapper.setAnnotationIntrospector(introspector);
- mapper.getSerializationConfig()
- .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
- }
-
- public AnomalyMetricPublisher(String collectorHost, String protocol, String port) {
- this.collectorHost = collectorHost;
- this.protocol = protocol;
- this.port = port;
- this.hostName = getDefaultLocalHostName();
- }
-
- private String getDefaultLocalHostName() {
- try {
- return InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException e) {
- LOG.info("Error getting host address");
- }
- return null;
- }
-
- public void publish(List<MetricAnomaly> metricAnomalies) {
- LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
- List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
- if (!metricList.isEmpty()) {
- TimelineMetrics timelineMetrics = new TimelineMetrics();
- timelineMetrics.setMetrics(metricList);
- emitMetrics(timelineMetrics);
- }
- }
-
- private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
- List<TimelineMetric> metrics = new ArrayList<>();
-
- if (metricAnomalies.isEmpty()) {
- return metrics;
- }
-
- long currentTime = System.currentTimeMillis();
- MetricAnomaly prevAnomaly = metricAnomalies.get(0);
-
- TimelineMetric timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(prevAnomaly.getMetricKey() + "_" + prevAnomaly.getMethodResult().getMethodType());
- timelineMetric.setAppId(serviceName);
- timelineMetric.setInstanceId(instanceId);
- timelineMetric.setHostName(hostName);
- timelineMetric.setStartTime(currentTime);
-
- TreeMap<Long,Double> metricValues = new TreeMap<>();
- metricValues.put(prevAnomaly.getTimestamp(), prevAnomaly.getMetricValue());
- MetricAnomaly currentAnomaly;
-
- for (int i = 1; i < metricAnomalies.size(); i++) {
- currentAnomaly = metricAnomalies.get(i);
- if (currentAnomaly.getMetricKey().equals(prevAnomaly.getMetricKey())) {
- metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
- } else {
- timelineMetric.setMetricValues(metricValues);
- metrics.add(timelineMetric);
-
- timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(currentAnomaly.getMetricKey() + "_" + currentAnomaly.getMethodResult().getMethodType());
- timelineMetric.setAppId(serviceName);
- timelineMetric.setInstanceId(instanceId);
- timelineMetric.setHostName(hostName);
- timelineMetric.setStartTime(currentTime);
- metricValues = new TreeMap<>();
- metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
- prevAnomaly = currentAnomaly;
- }
- }
-
- timelineMetric.setMetricValues(metricValues);
- metrics.add(timelineMetric);
- return metrics;
- }
-
- private boolean emitMetrics(TimelineMetrics metrics) {
- String connectUrl = constructTimelineMetricUri();
- String jsonData = null;
- LOG.info("EmitMetrics connectUrl = " + connectUrl);
- try {
- jsonData = mapper.writeValueAsString(metrics);
- } catch (IOException e) {
- LOG.error("Unable to parse metrics", e);
- }
- if (jsonData != null) {
- return emitMetricsJson(connectUrl, jsonData);
- }
- return false;
- }
-
- private HttpURLConnection getConnection(String spec) throws IOException {
- return (HttpURLConnection) new URL(spec).openConnection();
- }
-
- private boolean emitMetricsJson(String connectUrl, String jsonData) {
- int timeout = 10000;
- HttpURLConnection connection = null;
- try {
- if (connectUrl == null) {
- throw new IOException("Unknown URL. Unable to connect to metrics collector.");
- }
- connection = getConnection(connectUrl);
-
- connection.setRequestMethod("POST");
- connection.setRequestProperty("Content-Type", "application/json");
- connection.setRequestProperty("Connection", "Keep-Alive");
- connection.setConnectTimeout(timeout);
- connection.setReadTimeout(timeout);
- connection.setDoOutput(true);
-
- if (jsonData != null) {
- try (OutputStream os = connection.getOutputStream()) {
- os.write(jsonData.getBytes("UTF-8"));
- }
- }
-
- int statusCode = connection.getResponseCode();
-
- if (statusCode != 200) {
- LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
- "statusCode = " + statusCode);
- } else {
- LOG.info("Metrics posted to Collector " + connectUrl);
- }
- return true;
- } catch (IOException ioe) {
- LOG.error(ioe.getMessage());
- }
- return false;
- }
-
- private String constructTimelineMetricUri() {
- StringBuilder sb = new StringBuilder(protocol);
- sb.append("://");
- sb.append(collectorHost);
- sb.append(":");
- sb.append(port);
- sb.append(WS_V1_TIMELINE_METRICS);
- return sb.toString();
- }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
deleted file mode 100644
index 3989c67..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.spark;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
-import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel;
-import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel;
-import org.apache.ambari.metrics.alertservice.methods.ema.EmaModelLoader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import scala.Tuple2;
-
-import java.util.*;
-
-public class MetricAnomalyDetector {
-
- private static final Log LOG = LogFactory.getLog(MetricAnomalyDetector.class);
- private static String groupId = "ambari-metrics-group";
- private static String topicName = "ambari-metrics-topic";
- private static int numThreads = 1;
-
- //private static String zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181";
- //private static Map<String, String> kafkaParams = new HashMap<>();
- //static {
- // kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "avijayan-ams-2.openstacklocal:6667");
- // kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- // kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonSerializer");
- // kafkaParams.put("metadata.broker.list", "avijayan-ams-2.openstacklocal:6667");
- //}
-
- public MetricAnomalyDetector() {
- }
-
- public static void main(String[] args) throws InterruptedException {
-
-
- if (args.length < 6) {
- System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
- System.exit(1);
- }
-
- List<String> appIds = Arrays.asList(args[1].split(","));
- String collectorHost = args[2];
- String collectorPort = args[3];
- String collectorProtocol = args[4];
- String zkQuorum = args[5];
-
- List<MetricAnomalyModel> anomalyDetectionModels = new ArrayList<>();
- AnomalyMetricPublisher anomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort);
-
- SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector");
-
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
-
- for (String method : args[0].split(",")) {
- if (method.equals("ema")) {
- LOG.info("Model EMA requested.");
- EmaModel emaModel = new EmaModelLoader().load(jssc.sparkContext().sc(), "/tmp/model/ema");
- anomalyDetectionModels.add(emaModel);
- }
- }
-
- JavaPairReceiverInputDStream<String, String> messages =
- KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
-
- //Convert JSON string to TimelineMetrics.
- JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
- @Override
- public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
- ObjectMapper mapper = new ObjectMapper();
- TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
- return metrics;
- }
- });
-
- //Group TimelineMetric by AppId.
- JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
- timelineMetrics -> new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(),timelineMetrics)
- );
-
- appMetricStream.print();
-
- //Filter AppIds that are not needed.
- JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
- @Override
- public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
- return appIds.contains(appMetricTuple._1);
- }
- });
-
- filteredAppMetricStream.print();
-
- filteredAppMetricStream.foreachRDD(rdd -> {
- rdd.foreach(
- tuple2 -> {
- TimelineMetrics metrics = tuple2._2();
- for (TimelineMetric metric : metrics.getMetrics()) {
-
- TimelineMetric timelineMetric =
- new TimelineMetric(metric.getMetricName(), metric.getAppId(), metric.getHostName(), metric.getMetricValues());
-
- for (MetricAnomalyModel model : anomalyDetectionModels) {
- List<MetricAnomaly> anomalies = model.test(timelineMetric);
- anomalyMetricPublisher.publish(anomalies);
- for (MetricAnomaly anomaly : anomalies) {
- LOG.info(anomaly.getAnomalyAsString());
- }
-
- }
- }
- });
- });
-
- jssc.start();
- jssc.awaitTermination();
- }
-}
-
-
-
-
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
index b25e79d..bca3366 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
@@ -25,12 +25,9 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval
granularity <- train_data[2,1] - train_data[1,1]
test_start <- test_data[1,1]
test_end <- test_data[length(test_data[1,]),1]
- cat ("\n test_start : ", as.numeric(test_start))
train_start <- test_start - num_historic_periods*period
- cat ("\n train_start : ", as.numeric(train_start))
# round to start of day
train_start <- train_start - (train_start %% interval)
- cat ("\n train_start after rounding: ", as.numeric(train_start))
time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT")
test_data_day <- time$wday
@@ -39,7 +36,6 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval
for ( i in length(train_data[,1]):1) {
ts <- train_data[i,1]
if ( ts < train_start) {
- cat ("\n Breaking out of loop : ", ts)
break
}
time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT")
@@ -49,20 +45,14 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval
}
}
- cat ("\n Train data length : ", length(train_data[,1]))
- cat ("\n Test data length : ", length(test_data[,1]))
- cat ("\n Historic data length : ", length(h_data))
if (length(h_data) < 2*length(test_data[,1])) {
cat ("\nNot enough training data")
return (anomalies)
}
past_median <- median(h_data)
- cat ("\npast_median : ", past_median)
past_sd <- sd(h_data)
- cat ("\npast_sd : ", past_sd)
curr_median <- median(test_data[,2])
- cat ("\ncurr_median : ", curr_median)
if (abs(curr_median - past_median) > n * past_sd) {
anomaly <- c(test_start, test_end, curr_median, past_median, past_sd)
@@ -70,7 +60,7 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval
}
if(length(anomalies) > 0) {
- names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", " Past SD")
+ names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD")
}
return (anomalies)
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
index b4dfdcb..f22bc15 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
@@ -24,7 +24,7 @@ ams_ks <- function(train_data, test_data, p_value) {
# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2]
anomalies <- data.frame()
- res <- ks.test(train_data, test_data[,2])
+ res <- ks.test(train_data[,2], test_data[,2])
if (res[2] < p_value) {
anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2])
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
index 7fffbdd..f33b6ec 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
@@ -32,12 +32,17 @@ ams_tukeys <- function(train_data, test_data, n) {
lb <- quantiles[2] - n*iqr
ub <- quantiles[4] + n*iqr
if ( (x < lb) || (x > ub) ) {
- anomaly <- c(test_data[i,1], x)
+ if (x < lb) {
+ niqr <- (quantiles[2] - x) / iqr
+ } else {
+ niqr <- (x - quantiles[4]) / iqr
+ }
+ anomaly <- c(test_data[i,1], x, niqr)
anomalies <- rbind(anomalies, anomaly)
}
}
if(length(anomalies) > 0) {
- names(anomalies) <- c("TS", "Value")
+ names(anomalies) <- c("TS", "Value", "niqr")
}
return (anomalies)
}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
deleted file mode 100644
index 3827006..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#url_prefix = 'http://104.196.95.78:3000/api/datasources/proxy/1/ws/v1/timeline/metrics?'
-#url_suffix = '&startTime=1459972944&endTime=1491508944&precision=MINUTES'
-#data_url <- paste(url_prefix, query, sep ="")
-#data_url <- paste(data_url, url_suffix, sep="")
-
-get_data <- function(url) {
- library(rjson)
- res <- fromJSON(readLines(url)[1])
- return (res)
-}
-
-find_index <- function(data, ts) {
- for (i in 1:length(data)) {
- if (as.numeric(ts) == as.numeric(data[i])) {
- return (i)
- }
- }
- return (-1)
-}
\ No newline at end of file
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
new file mode 100644
index 0000000..539ca40
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.TreeMap;
+
+public class TestEmaTechnique {
+
+ @Test
+ public void testEmaInitialization() {
+
+ EmaTechnique ema = new EmaTechnique(0.5, 3);
+ Assert.assertTrue(ema.getTrackedEmas().isEmpty());
+ Assert.assertTrue(ema.getStartingWeight() == 0.5);
+ Assert.assertTrue(ema.getStartTimesSdev() == 3);
+ }
+
+ @Test
+ public void testEma() {
+ EmaTechnique ema = new EmaTechnique(0.5, 3);
+
+ long now = System.currentTimeMillis();
+
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setMetricName("M1");
+ metric1.setHostName("H1");
+ metric1.setStartTime(now - 1000);
+ metric1.setAppId("A1");
+ metric1.setInstanceId(null);
+ metric1.setType("Integer");
+
+ //Train
+ TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+ for (int i = 0; i < 50; i++) {
+ double metric = 20000 + Math.random();
+ metricValues.put(now - i * 100, metric);
+ }
+ metric1.setMetricValues(metricValues);
+ List<MetricAnomaly> anomalyList = ema.test(metric1);
+// Assert.assertTrue(anomalyList.isEmpty());
+
+ metricValues = new TreeMap<Long, Double>();
+ for (int i = 0; i < 50; i++) {
+ double metric = 20000 + Math.random();
+ metricValues.put(now - i * 100, metric);
+ }
+ metric1.setMetricValues(metricValues);
+ anomalyList = ema.test(metric1);
+ Assert.assertTrue(!anomalyList.isEmpty());
+ int l1 = anomalyList.size();
+
+ Assert.assertTrue(ema.updateModel(metric1, false, 20));
+ anomalyList = ema.test(metric1);
+ int l2 = anomalyList.size();
+ Assert.assertTrue(l2 < l1);
+
+ Assert.assertTrue(ema.updateModel(metric1, true, 50));
+ anomalyList = ema.test(metric1);
+ int l3 = anomalyList.size();
+ Assert.assertTrue(l3 > l2 && l3 > l1);
+
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
new file mode 100644
index 0000000..9a102a0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.UniformMetricSeries;
+import org.apache.commons.lang.ArrayUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class TestRFunctionInvoker {
+
+ private static String metricName = "TestMetric";
+ private static double[] ts;
+ private static String fullFilePath;
+
+ @BeforeClass
+ public static void init() throws URISyntaxException {
+
+ Assume.assumeTrue(System.getenv("R_HOME") != null);
+ ts = getTS(1000);
+ URL url = ClassLoader.getSystemResource("R-scripts");
+ fullFilePath = new File(url.toURI()).getAbsolutePath();
+ RFunctionInvoker.setScriptsDir(fullFilePath);
+ }
+
+ @Test
+ public void testTukeys() throws URISyntaxException {
+
+ double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
+ double[] train_x = getRandomData(750);
+ DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
+
+ double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
+ double[] test_x = getRandomData(250);
+ test_x[50] = 5.5; //Anomaly
+ DataSeries testData = new DataSeries(metricName, test_ts, test_x);
+ Map<String, String> configs = new HashMap();
+ configs.put("tukeys.n", "3");
+
+ ResultSet rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+ Assert.assertEquals(rs.resultset.size(), 2);
+ Assert.assertEquals(rs.resultset.get(1)[0], 5.5, 0.1);
+
+ }
+
+ public static void main(String[] args) throws URISyntaxException {
+
+ String metricName = "TestMetric";
+ double[] ts = getTS(1000);
+ URL url = ClassLoader.getSystemResource("R-scripts");
+ String fullFilePath = new File(url.toURI()).getAbsolutePath();
+ RFunctionInvoker.setScriptsDir(fullFilePath);
+
+ double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
+ double[] train_x = getRandomData(750);
+ DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
+
+ double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
+ double[] test_x = getRandomData(250);
+ test_x[50] = 5.5; //Anomaly
+ DataSeries testData = new DataSeries(metricName, test_ts, test_x);
+ ResultSet rs;
+
+ Map<String, String> configs = new HashMap();
+
+ System.out.println("TUKEYS");
+ configs.put("tukeys.n", "3");
+ rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+ rs.print();
+ System.out.println("--------------");
+
+// System.out.println("EMA Global");
+// configs.put("ema.n", "3");
+// configs.put("ema.w", "0.8");
+// rs = RFunctionInvoker.ema_global(trainData, testData, configs);
+// rs.print();
+// System.out.println("--------------");
+//
+// System.out.println("EMA Daily");
+// rs = RFunctionInvoker.ema_daily(trainData, testData, configs);
+// rs.print();
+// System.out.println("--------------");
+//
+// configs.put("ks.p_value", "0.00005");
+// System.out.println("KS Test");
+// rs = RFunctionInvoker.ksTest(trainData, testData, configs);
+// rs.print();
+// System.out.println("--------------");
+//
+ ts = getTS(5000);
+ train_ts = ArrayUtils.subarray(ts, 0, 4800);
+ train_x = getRandomData(4800);
+ trainData = new DataSeries(metricName, train_ts, train_x);
+ test_ts = ArrayUtils.subarray(ts, 4800, 5000);
+ test_x = getRandomData(200);
+ for (int i = 0; i < 200; i++) {
+ test_x[i] = test_x[i] * 5;
+ }
+ testData = new DataSeries(metricName, test_ts, test_x);
+ configs.put("hsdev.n", "3");
+ configs.put("hsdev.nhp", "3");
+ configs.put("hsdev.interval", "86400000");
+ configs.put("hsdev.period", "604800000");
+ System.out.println("HSdev");
+ rs = RFunctionInvoker.hsdev(trainData, testData, configs);
+ rs.print();
+ System.out.println("--------------");
+
+ }
+
+ static double[] getTS(int n) {
+ long currentTime = System.currentTimeMillis();
+ double[] ts = new double[n];
+ currentTime = currentTime - (currentTime % (5 * 60 * 1000));
+
+ for (int i = 0, j = n - 1; i < n; i++, j--) {
+ ts[j] = currentTime;
+ currentTime = currentTime - (5 * 60 * 1000);
+ }
+ return ts;
+ }
+
+ static double[] getRandomData(int n) {
+
+ UniformMetricSeries metricSeries = new UniformMetricSeries(10, 0.1,0.05, 0.6, 0.8, true);
+ return metricSeries.getSeries(n);
+
+// double[] metrics = new double[n];
+// Random random = new Random();
+// for (int i = 0; i < n; i++) {
+// metrics[i] = random.nextDouble();
+// }
+// return metrics;
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
new file mode 100644
index 0000000..bb409cf
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.TreeMap;
+
+public class TestTukeys {
+
+ @BeforeClass
+ public static void init() throws URISyntaxException {
+ Assume.assumeTrue(System.getenv("R_HOME") != null);
+ }
+
+ @Test
+ public void testPointInTimeDetectionSystem() throws UnknownHostException, URISyntaxException {
+
+ URL url = ClassLoader.getSystemResource("R-scripts");
+ String fullFilePath = new File(url.toURI()).getAbsolutePath();
+ RFunctionInvoker.setScriptsDir(fullFilePath);
+
+ MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface("avijayan-ams-1.openstacklocal","http", "6188");
+
+ EmaTechnique ema = new EmaTechnique(0.5, 3);
+ long now = System.currentTimeMillis();
+
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setMetricName("mm9");
+ metric1.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+ metric1.setStartTime(now);
+ metric1.setAppId("aa9");
+ metric1.setInstanceId(null);
+ metric1.setType("Integer");
+
+ //Train
+ TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+ //2hr data.
+ for (int i = 0; i < 120; i++) {
+ double metric = 20000 + Math.random();
+ metricValues.put(now - i * 60 * 1000, metric);
+ }
+ metric1.setMetricValues(metricValues);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(metric1);
+
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+
+ List<MetricAnomaly> anomalyList = ema.test(metric1);
+ metricsCollectorInterface.publish(anomalyList);
+//
+// PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(ema, metricsCollectorInterface, 3, 5*60*1000, 15*60*1000);
+// pointInTimeADSystem.runOnce();
+//
+// List<MetricAnomaly> anomalyList2 = ema.test(metric1);
+//
+// pointInTimeADSystem.runOnce();
+// List<MetricAnomaly> anomalyList3 = ema.test(metric1);
+//
+// pointInTimeADSystem.runOnce();
+// List<MetricAnomaly> anomalyList4 = ema.test(metric1);
+//
+// pointInTimeADSystem.runOnce();
+// List<MetricAnomaly> anomalyList5 = ema.test(metric1);
+//
+// pointInTimeADSystem.runOnce();
+// List<MetricAnomaly> anomalyList6 = ema.test(metric1);
+//
+// Assert.assertTrue(anomalyList6.size() < anomalyList.size());
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
new file mode 100644
index 0000000..575ea8b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricSeriesGeneratorTest {
+
+ @Test
+ public void testUniformSeries() {
+
+ UniformMetricSeries metricSeries = new UniformMetricSeries(5, 0.2, 0, 0, 0, true);
+ Assert.assertTrue(metricSeries.nextValue() <= 6 && metricSeries.nextValue() >= 4);
+
+ double[] uniformSeries = MetricSeriesGeneratorFactory.createUniformSeries(50, 10, 0.2, 0.1, 0.4, 0.5, true);
+ Assert.assertTrue(uniformSeries.length == 50);
+
+ for (int i = 0; i < uniformSeries.length; i++) {
+ double value = uniformSeries[i];
+
+ if (value > 10 * 1.2) {
+ Assert.assertTrue(value >= 10 * 1.4 && value <= 10 * 1.6);
+ } else {
+ Assert.assertTrue(value >= 10 * 0.8 && value <= 10 * 1.2);
+ }
+ }
+ }
+
+ @Test
+ public void testNormalSeries() {
+ NormalMetricSeries metricSeries = new NormalMetricSeries(0, 1, 0, 0, 0, true);
+ Assert.assertTrue(metricSeries.nextValue() <= 3 && metricSeries.nextValue() >= -3);
+ }
+
+ @Test
+ public void testMonotonicSeries() {
+
+ MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(0, 0.5, 0, 0, 0, 0, true);
+ Assert.assertTrue(metricSeries.nextValue() == 0);
+ Assert.assertTrue(metricSeries.nextValue() == 0.5);
+
+ double[] incSeries = MetricSeriesGeneratorFactory.createMonotonicSeries(20, 0, 0.5, 0, 0, 0, 0, true);
+ Assert.assertTrue(incSeries.length == 20);
+ for (int i = 0; i < incSeries.length; i++) {
+ Assert.assertTrue(incSeries[i] == i * 0.5);
+ }
+ }
+
+ @Test
+ public void testDualBandSeries() {
+ double[] dualBandSeries = MetricSeriesGeneratorFactory.getDualBandSeries(30, 5, 0.2, 5, 15, 0.3, 4);
+ Assert.assertTrue(dualBandSeries[0] >= 4 && dualBandSeries[0] <= 6);
+ Assert.assertTrue(dualBandSeries[4] >= 4 && dualBandSeries[4] <= 6);
+ Assert.assertTrue(dualBandSeries[5] >= 10.5 && dualBandSeries[5] <= 19.5);
+ Assert.assertTrue(dualBandSeries[8] >= 10.5 && dualBandSeries[8] <= 19.5);
+ Assert.assertTrue(dualBandSeries[9] >= 4 && dualBandSeries[9] <= 6);
+ }
+
+ @Test
+ public void testStepSeries() {
+ double[] stepSeries = MetricSeriesGeneratorFactory.getStepFunctionSeries(30, 10, 0, 0, 5, 5, 0.5, true);
+
+ Assert.assertTrue(stepSeries[0] == 10);
+ Assert.assertTrue(stepSeries[4] == 10);
+
+ Assert.assertTrue(stepSeries[5] == 10*1.5);
+ Assert.assertTrue(stepSeries[9] == 10*1.5);
+
+ Assert.assertTrue(stepSeries[10] == 10*1.5*1.5);
+ Assert.assertTrue(stepSeries[14] == 10*1.5*1.5);
+ }
+
+ @Test
+ public void testSteadySeriesWithTurbulence() {
+ double[] steadySeriesWithTurbulence = MetricSeriesGeneratorFactory.getSteadySeriesWithTurbulentPeriod(30, 5, 0, 1, 1, 5, 1);
+
+ int count = 0;
+ for (int i = 0; i < steadySeriesWithTurbulence.length; i++) {
+ if (steadySeriesWithTurbulence[i] == 10) {
+ count++;
+ }
+ }
+ Assert.assertTrue(count == 5);
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 1f03fe9..3dfcf4e 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.metrics2.sink.timeline;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
@@ -34,11 +35,11 @@ import org.codehaus.jackson.map.annotate.JsonDeserialize;
@XmlAccessorType(XmlAccessType.NONE)
@InterfaceAudience.Public
@InterfaceStability.Unstable
-public class TimelineMetric implements Comparable<TimelineMetric> {
+public class TimelineMetric implements Comparable<TimelineMetric>, Serializable {
private String metricName;
private String appId;
- private String instanceId;
+ private String instanceId = null;
private String hostName;
private long startTime;
private String type;
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
index 0c5965c..a8d3da8 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.metrics2.sink.timeline;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@XmlAccessorType(XmlAccessType.NONE)
@InterfaceAudience.Public
@InterfaceStability.Unstable
-public class TimelineMetrics {
+public class TimelineMetrics implements Serializable{
private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
diff --git a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
index bff094b..e51a47f 100644
--- a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
+++ b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
@@ -21,13 +21,13 @@ import java.util
import java.util.logging.LogManager
import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.ambari.metrics.alertservice.prototype.MetricsCollectorInterface
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
-import org.apache.ambari.metrics.alertservice.common.{MetricAnomaly, TimelineMetrics}
-import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel
-import org.apache.ambari.metrics.alertservice.methods.ema.{EmaModel, EmaModelLoader}
-import org.apache.ambari.metrics.alertservice.spark.AnomalyMetricPublisher
+import org.apache.ambari.metrics.alertservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly}
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique}
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
import org.apache.log4j.Logger
import org.apache.spark.storage.StorageLevel
@@ -41,7 +41,7 @@ object MetricAnomalyDetector extends Logging {
var groupId = "ambari-metrics-group"
var topicName = "ambari-metrics-topic"
var numThreads = 1
- val anomalyDetectionModels: Array[MetricAnomalyModel] = Array[MetricAnomalyModel]()
+ val anomalyDetectionModels: Array[AnomalyDetectionTechnique] = Array[AnomalyDetectionTechnique]()
def main(args: Array[String]): Unit = {
@@ -54,7 +54,7 @@ object MetricAnomalyDetector extends Logging {
}
for (method <- args(0).split(",")) {
- if (method == "ema") anomalyDetectionModels :+ new EmaModel()
+ if (method == "ema") anomalyDetectionModels :+ new EmaTechnique(0.5, 3)
}
val appIds = util.Arrays.asList(args(1).split(","))
@@ -63,7 +63,7 @@ object MetricAnomalyDetector extends Logging {
val collectorPort = args(3)
val collectorProtocol = args(4)
- val anomalyMetricPublisher: AnomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort)
+ val anomalyMetricPublisher: MetricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort)
val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector")
@@ -99,10 +99,6 @@ object MetricAnomalyDetector extends Logging {
for (timelineMetric <- timelineMetrics.getMetrics) {
var anomalies = emaModel.test(timelineMetric)
anomalyMetricPublisher.publish(anomalies)
- for (anomaly <- anomalies) {
- var an = anomaly : MetricAnomaly
- logger.info(an.getAnomalyAsString)
- }
}
})
})
diff --git a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
index 3c8e1ed..edd6366 100644
--- a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
+++ b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
@@ -17,8 +17,8 @@
package org.apache.ambari.metrics.spark
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric
-import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
@@ -61,15 +61,19 @@ object SparkPhoenixReader {
t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
)
- //val metricName = result.head().getString(0)
+ //val seriesName = result.head().getString(0)
//val hostname = result.head().getString(1)
//val appId = result.head().getString(2)
- val timelineMetric = new TimelineMetric(metricName, appId, hostname, metricValues)
+ val timelineMetric = new TimelineMetric()
+ timelineMetric.setMetricName(metricName)
+ timelineMetric.setAppId(appId)
+ timelineMetric.setHostName(hostname)
+ timelineMetric.setMetricValues(metricValues)
- var emaModel = new EmaModel()
- emaModel.train(timelineMetric, weight, timessdev)
- emaModel.save(sc, modelDir)
+// var emaModel = new EmaTechnique()
+// emaModel.train(timelineMetric, weight, timessdev)
+// emaModel.save(sc, modelDir)
// var metricData:Seq[Double] = Seq.empty
// result.collect().foreach(
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index d306ad3..a8ac1da 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -348,7 +348,7 @@
<dependency>
<groupId>org.apache.ambari</groupId>
<artifactId>ambari-metrics-alertservice</artifactId>
- <version>2.0.0.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index 110b094..95682f9 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -407,30 +407,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
}
- private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) {
- org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
-
- List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>();
- for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
- timelineMetricList.add(fromTimelineMetric(timelineMetric));
- }
- otherMetrics.setMetrics(timelineMetricList);
- return otherMetrics;
- }
-
- private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) {
-
- org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
- otherMetric.setMetricValues(timelineMetric.getMetricValues());
- otherMetric.setStartTime(timelineMetric.getStartTime());
- otherMetric.setHostName(timelineMetric.getHostName());
- otherMetric.setInstanceId(timelineMetric.getInstanceId());
- otherMetric.setAppId(timelineMetric.getAppId());
- otherMetric.setMetricName(timelineMetric.getMetricName());
-
- return otherMetric;
- }
-
@Override
public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
throws SQLException, IOException {
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
index 4ec624a..b57f7e9 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
@@ -39,7 +39,6 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
- timelineMetric.setTimestamp(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 1000, 1.0d);
inputValues.put(now - 2000, 2.0d);
@@ -67,7 +66,6 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
- timelineMetric.setTimestamp(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 1000, 1.0d);
inputValues.put(now - 2000, 2.0d);
@@ -95,7 +93,6 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
- timelineMetric.setTimestamp(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now, 0.0d);
inputValues.put(now - 1000, 1.0d);
@@ -123,7 +120,6 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
- timelineMetric.setTimestamp(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 1000, 1.0d);
timelineMetric.setMetricValues(inputValues);
@@ -149,7 +145,6 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
- timelineMetric.setTimestamp(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 1000, 1.0d);
timelineMetric.setMetricValues(inputValues);
@@ -173,7 +168,6 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
- timelineMetric.setTimestamp(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
long seconds = 1000;
@@ -234,7 +228,6 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
- timelineMetric.setTimestamp(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 100, 1.0d);
inputValues.put(now - 200, 2.0d);
--
To stop receiving notification emails like this one, please contact
avijayan@apache.org.