You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/09/27 22:03:12 UTC
[6/6] ambari git commit: AMBARI-22077 : Create maven module and
package structure for the anomaly detection engine. (Commit 2) (avijayan)
AMBARI-22077 : Create maven module and package structure for the anomaly detection engine. (Commit 2) (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4613b471
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4613b471
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4613b471
Branch: refs/heads/branch-3.0-ams
Commit: 4613b471e20257df7f1f732e9444d8a90c71d743
Parents: e33b545
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Wed Sep 27 15:02:56 2017 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Wed Sep 27 15:02:56 2017 -0700
----------------------------------------------------------------------
.../pom.xml | 205 ++++++++++
.../adservice/prototype/common/DataSeries.java | 38 ++
.../adservice/prototype/common/ResultSet.java | 43 +++
.../prototype/common/StatisticUtils.java | 62 +++
.../prototype/core/AmbariServerInterface.java | 121 ++++++
.../prototype/core/MetricKafkaProducer.java | 56 +++
.../prototype/core/MetricSparkConsumer.java | 239 ++++++++++++
.../core/MetricsCollectorInterface.java | 237 ++++++++++++
.../prototype/core/PointInTimeADSystem.java | 260 +++++++++++++
.../prototype/core/RFunctionInvoker.java | 222 +++++++++++
.../adservice/prototype/core/TrendADSystem.java | 317 ++++++++++++++++
.../adservice/prototype/core/TrendMetric.java | 33 ++
.../methods/AnomalyDetectionTechnique.java | 30 ++
.../prototype/methods/MetricAnomaly.java | 86 +++++
.../prototype/methods/ema/EmaModel.java | 131 +++++++
.../prototype/methods/ema/EmaModelLoader.java | 40 ++
.../prototype/methods/ema/EmaTechnique.java | 151 ++++++++
.../prototype/methods/hsdev/HsdevTechnique.java | 81 ++++
.../prototype/methods/kstest/KSTechnique.java | 101 +++++
.../MetricAnomalyDetectorTestInput.java | 126 ++++++
.../testing/utilities/MetricAnomalyTester.java | 168 ++++++++
.../utilities/TestMetricSeriesGenerator.java | 92 +++++
.../utilities/TestSeriesInputRequest.java | 88 +++++
.../src/main/resources/R-scripts/ema.R | 96 +++++
.../src/main/resources/R-scripts/hsdev.r | 67 ++++
.../src/main/resources/R-scripts/iforest.R | 52 +++
.../src/main/resources/R-scripts/kstest.r | 38 ++
.../src/main/resources/R-scripts/test.R | 85 +++++
.../src/main/resources/R-scripts/tukeys.r | 51 +++
.../src/main/resources/input-config.properties | 42 ++
.../spark/prototype/MetricAnomalyDetector.scala | 126 ++++++
.../spark/prototype/SparkPhoenixReader.scala | 78 ++++
.../adservice/prototype/TestEmaTechnique.java | 106 ++++++
.../prototype/TestRFunctionInvoker.java | 161 ++++++++
.../metrics/adservice/prototype/TestTukeys.java | 100 +++++
.../seriesgenerator/AbstractMetricSeries.java | 25 ++
.../seriesgenerator/DualBandMetricSeries.java | 88 +++++
.../MetricSeriesGeneratorFactory.java | 377 ++++++++++++++++++
.../MetricSeriesGeneratorTest.java | 101 +++++
.../seriesgenerator/MonotonicMetricSeries.java | 101 +++++
.../seriesgenerator/NormalMetricSeries.java | 81 ++++
.../SteadyWithTurbulenceMetricSeries.java | 115 ++++++
.../StepFunctionMetricSeries.java | 107 ++++++
.../seriesgenerator/UniformMetricSeries.java | 95 +++++
.../ambari-metrics-anomaly-detector/pom.xml | 205 ----------
.../prototype/common/DataSeries.java | 38 --
.../prototype/common/ResultSet.java | 43 ---
.../prototype/common/StatisticUtils.java | 62 ---
.../prototype/core/AmbariServerInterface.java | 121 ------
.../prototype/core/MetricKafkaProducer.java | 56 ---
.../prototype/core/MetricSparkConsumer.java | 239 ------------
.../core/MetricsCollectorInterface.java | 237 ------------
.../prototype/core/PointInTimeADSystem.java | 260 -------------
.../prototype/core/RFunctionInvoker.java | 222 -----------
.../prototype/core/TrendADSystem.java | 317 ----------------
.../prototype/core/TrendMetric.java | 33 --
.../methods/AnomalyDetectionTechnique.java | 32 --
.../prototype/methods/MetricAnomaly.java | 86 -----
.../prototype/methods/ema/EmaModel.java | 131 -------
.../prototype/methods/ema/EmaModelLoader.java | 46 ---
.../prototype/methods/ema/EmaTechnique.java | 151 --------
.../prototype/methods/hsdev/HsdevTechnique.java | 81 ----
.../prototype/methods/kstest/KSTechnique.java | 101 -----
.../MetricAnomalyDetectorTestInput.java | 126 ------
.../testing/utilities/MetricAnomalyTester.java | 166 --------
.../utilities/TestMetricSeriesGenerator.java | 92 -----
.../utilities/TestSeriesInputRequest.java | 88 -----
.../seriesgenerator/AbstractMetricSeries.java | 25 --
.../seriesgenerator/DualBandMetricSeries.java | 88 -----
.../MetricSeriesGeneratorFactory.java | 379 -------------------
.../seriesgenerator/MonotonicMetricSeries.java | 101 -----
.../seriesgenerator/NormalMetricSeries.java | 81 ----
.../SteadyWithTurbulenceMetricSeries.java | 115 ------
.../StepFunctionMetricSeries.java | 107 ------
.../seriesgenerator/UniformMetricSeries.java | 95 -----
.../src/main/resources/R-scripts/ema.R | 96 -----
.../src/main/resources/R-scripts/hsdev.r | 67 ----
.../src/main/resources/R-scripts/iforest.R | 52 ---
.../src/main/resources/R-scripts/kstest.r | 38 --
.../src/main/resources/R-scripts/test.R | 85 -----
.../src/main/resources/R-scripts/tukeys.r | 51 ---
.../src/main/resources/input-config.properties | 42 --
.../metrics/spark/MetricAnomalyDetector.scala | 127 -------
.../metrics/spark/SparkPhoenixReader.scala | 78 ----
.../prototype/TestEmaTechnique.java | 106 ------
.../prototype/TestRFunctionInvoker.java | 161 --------
.../alertservice/prototype/TestTukeys.java | 100 -----
.../MetricSeriesGeneratorTest.java | 101 -----
ambari-metrics/pom.xml | 2 +-
89 files changed, 5020 insertions(+), 5029 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
new file mode 100644
index 0000000..1a10f86
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -0,0 +1,205 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-metrics</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>ambari-metrics-anomaly-detection-service</artifactId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ <properties>
+ <scala.version>2.10.4</scala.version>
+ <scala.binary.version>2.11</scala.binary.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>scala-tools.org</id>
+ <name>Scala-Tools Maven2 Repository</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </repository>
+ </repositories>
+
+ <pluginRepositories>
+ <pluginRepository>
+ <id>scala-tools.org</id>
+ <name>Scala-Tools Maven2 Repository</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </pluginRepository>
+ </pluginRepositories>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <scalaVersion>${scala.version}</scalaVersion>
+ <args>
+ <arg>-target:jvm-1.5</arg>
+ </args>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <name>Ambari Metrics Anomaly Detection Service</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.github.lucarosellini.rJava</groupId>
+ <artifactId>JRI</artifactId>
+ <version>0.9-7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.11</artifactId>
+ <version>2.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.10.1.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jmx</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-json</artifactId>
+ <version>0.10.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka_2.10</artifactId>
+ <version>1.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
+ <version>1.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.phoenix</groupId>
+ <artifactId>phoenix-spark</artifactId>
+ <version>4.10.0-HBase-1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-mllib_2.10</artifactId>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>4.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>2.1.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-mllib_${scala.binary.version}</artifactId>
+ <version>2.1.1</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java
new file mode 100644
index 0000000..54b402f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.common;
+
+import java.util.Arrays;
+
+public class DataSeries {
+
+ public String seriesName;
+ public double[] ts;
+ public double[] values;
+
+ public DataSeries(String seriesName, double[] ts, double[] values) {
+ this.seriesName = seriesName;
+ this.ts = ts;
+ this.values = values;
+ }
+
+ @Override
+ public String toString() {
+ return seriesName + Arrays.toString(ts) + Arrays.toString(values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java
new file mode 100644
index 0000000..dd3038f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.common;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ResultSet {
+
+ public List<double[]> resultset = new ArrayList<>();
+
+ public ResultSet(List<double[]> resultset) {
+ this.resultset = resultset;
+ }
+
+ public void print() {
+ System.out.println("Result : ");
+ if (!resultset.isEmpty()) {
+ for (int i = 0; i<resultset.get(0).length;i++) {
+ for (double[] entity : resultset) {
+ System.out.print(entity[i] + " ");
+ }
+ System.out.println();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java
new file mode 100644
index 0000000..7f0aed3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.common;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public class StatisticUtils {
+
+ public static double mean(double[] values) {
+ double sum = 0;
+ for (double d : values) {
+ sum += d;
+ }
+ return sum / values.length;
+ }
+
+ public static double variance(double[] values) {
+ double avg = mean(values);
+ double variance = 0;
+ for (double d : values) {
+ variance += Math.pow(d - avg, 2.0);
+ }
+ return variance;
+ }
+
+ public static double sdev(double[] values, boolean useBesselsCorrection) {
+ double variance = variance(values);
+ int n = (useBesselsCorrection) ? values.length - 1 : values.length;
+ return Math.sqrt(variance / n);
+ }
+
+ public static double median(double[] values) {
+ double[] clonedValues = Arrays.copyOf(values, values.length);
+ Arrays.sort(clonedValues);
+ int n = values.length;
+
+ if (n % 2 != 0) {
+ return clonedValues[(n-1)/2];
+ } else {
+ return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java
new file mode 100644
index 0000000..920d758
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+public class AmbariServerInterface implements Serializable{
+
+ private static final Log LOG = LogFactory.getLog(AmbariServerInterface.class);
+
+ private String ambariServerHost;
+ private String clusterName;
+
+ public AmbariServerInterface(String ambariServerHost, String clusterName) {
+ this.ambariServerHost = ambariServerHost;
+ this.clusterName = clusterName;
+ }
+
+ public int getPointInTimeSensitivity() {
+
+ String url = constructUri("http", ambariServerHost, "8080", "/api/v1/clusters/" + clusterName + "/alert_definitions?fields=*");
+
+ URL obj = null;
+ BufferedReader in = null;
+
+ try {
+ obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+
+ String encoded = Base64.getEncoder().encodeToString(("admin:admin").getBytes(StandardCharsets.UTF_8));
+ con.setRequestProperty("Authorization", "Basic "+encoded);
+
+ int responseCode = con.getResponseCode();
+ LOG.info("Sending 'GET' request to URL : " + url);
+ LOG.info("Response Code : " + responseCode);
+
+ in = new BufferedReader(
+ new InputStreamReader(con.getInputStream()));
+
+ StringBuilder responseJsonSb = new StringBuilder();
+ String line;
+ while ((line = in.readLine()) != null) {
+ responseJsonSb.append(line);
+ }
+
+ JSONObject jsonObject = new JSONObject(responseJsonSb.toString());
+ JSONArray array = jsonObject.getJSONArray("items");
+ for(int i = 0 ; i < array.length() ; i++){
+ JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition");
+ if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) {
+ JSONObject sourceNode = alertDefn.getJSONObject("source");
+ JSONArray params = sourceNode.getJSONArray("parameters");
+ for(int j = 0 ; j < params.length() ; j++){
+ JSONObject param = params.getJSONObject(j);
+ if (param.get("name").equals("sensitivity")) {
+ return param.getInt("value");
+ }
+ }
+ break;
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ LOG.warn(e);
+ }
+ }
+ }
+
+ return -1;
+ }
+
+ private String constructUri(String protocol, String host, String port, String path) {
+ StringBuilder sb = new StringBuilder(protocol);
+ sb.append("://");
+ sb.append(host);
+ sb.append(":");
+ sb.append(port);
+ sb.append(path);
+ return sb.toString();
+ }
+
+// public static void main(String[] args) {
+// AmbariServerInterface ambariServerInterface = new AmbariServerInterface();
+// ambariServerInterface.getPointInTimeSensitivity("avijayan-ams-1.openstacklocal","c1");
+// }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java
new file mode 100644
index 0000000..167fbb3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class MetricKafkaProducer {
+
+ Producer producer;
+ private static String topicName = "ambari-metrics-topic";
+
+ public MetricKafkaProducer(String kafkaServers) {
+ Properties configProperties = new Properties();
+ configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667"
+ configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+ configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+ producer = new KafkaProducer(configProperties);
+ }
+
+ public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics);
+ ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode);
+ Future<RecordMetadata> kafkaFuture = producer.send(rec);
+
+ System.out.println(kafkaFuture.isDone());
+ System.out.println(kafkaFuture.get().topic());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
new file mode 100644
index 0000000..e8257e5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import scala.Tuple2;
+
+import java.util.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class MetricSparkConsumer {
+
+ private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class);
+ private static String groupId = "ambari-metrics-group";
+ private static String topicName = "ambari-metrics-topic";
+ private static int numThreads = 1;
+ private static long pitStartTime = System.currentTimeMillis();
+ private static long ksStartTime = pitStartTime;
+ private static long hdevStartTime = ksStartTime;
+ private static Set<Pattern> includeMetricPatterns = new HashSet<>();
+ private static Set<String> includedHosts = new HashSet<>();
+ private static Set<TrendMetric> trendMetrics = new HashSet<>();
+
+ public MetricSparkConsumer() {
+ }
+
+ public static Properties readProperties(String propertiesFile) {
+ try {
+ Properties properties = new Properties();
+ InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile);
+ if (inputStream == null) {
+ inputStream = new FileInputStream(propertiesFile);
+ }
+ properties.load(inputStream);
+ return properties;
+ } catch (IOException ioEx) {
+ LOG.error("Error reading properties file for jmeter");
+ return null;
+ }
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+
+ if (args.length < 1) {
+ System.err.println("Usage: MetricSparkConsumer <input-config-file>");
+ System.exit(1);
+ }
+
+ Properties properties = readProperties(args[0]);
+
+ List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(","));
+
+ String collectorHost = properties.getProperty("collectorHost");
+ String collectorPort = properties.getProperty("collectorPort");
+ String collectorProtocol = properties.getProperty("collectorProtocol");
+
+ String zkQuorum = properties.getProperty("zkQuorum");
+
+ double emaW = Double.parseDouble(properties.getProperty("emaW"));
+ double emaN = Double.parseDouble(properties.getProperty("emaN"));
+ int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold"));
+ double tukeysN = Double.parseDouble(properties.getProperty("tukeysN"));
+
+ long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval"));
+ long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval"));
+
+ long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval"));
+ long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval"));
+ int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp"));
+ long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval"));
+
+ String ambariServerHost = properties.getProperty("ambariServerHost");
+ String clusterName = properties.getProperty("clusterName");
+
+ String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns");
+ if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) {
+ String[] patterns = includeMetricPatternStrings.split(",");
+ for (String p : patterns) {
+ LOG.info("Included Pattern : " + p);
+ includeMetricPatterns.add(Pattern.compile(p));
+ }
+ }
+
+ String includedHostList = properties.getProperty("hosts");
+ if (includedHostList != null && !includedHostList.isEmpty()) {
+ String[] hosts = includedHostList.split(",");
+ includedHosts.addAll(Arrays.asList(hosts));
+ }
+
+ MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
+
+ SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector");
+
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
+
+ EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold);
+ PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface,
+ tukeysN,
+ pitTestInterval,
+ pitTrainInterval,
+ ambariServerHost,
+ clusterName);
+
+ TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
+ ksTestInterval,
+ ksTrainInterval,
+ hsdevNhp);
+
+ Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique);
+ Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem);
+ Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem);
+ Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface);
+ Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns);
+ Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts);
+
+ JavaPairReceiverInputDStream<String, String> messages =
+ KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
+
+ //Convert JSON string to TimelineMetrics.
+ JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
+ @Override
+ public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
+ return metrics;
+ }
+ });
+
+ timelineMetricsStream.print();
+
+ //Group TimelineMetric by AppId.
+ JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
+ timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics)
+ );
+
+ appMetricStream.print();
+
+ //Filter AppIds that are not needed.
+ JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
+ return appIds.contains(appMetricTuple._1);
+ }
+ });
+
+ filteredAppMetricStream.print();
+
+ filteredAppMetricStream.foreachRDD(rdd -> {
+ rdd.foreach(
+ tuple2 -> {
+ long currentTime = System.currentTimeMillis();
+ EmaTechnique ema = emaTechniqueBroadcast.getValue();
+ if (currentTime > pitStartTime + pitTestInterval) {
+ LOG.info("Running Tukeys....");
+ pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime);
+ pitStartTime = pitStartTime + pitTestInterval;
+ }
+
+ if (currentTime > ksStartTime + ksTestInterval) {
+ LOG.info("Running KS Test....");
+ trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics);
+ ksStartTime = ksStartTime + ksTestInterval;
+ }
+
+ if (currentTime > hdevStartTime + hsdevInterval) {
+ LOG.info("Running HSdev Test....");
+ trendADSystemBroadcast.getValue().runHsdevMethod();
+ hdevStartTime = hdevStartTime + hsdevInterval;
+ }
+
+ TimelineMetrics metrics = tuple2._2();
+ for (TimelineMetric timelineMetric : metrics.getMetrics()) {
+
+ boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
+ boolean includeMetric = false;
+ if (includeHost) {
+ if (includePatternBroadcast.getValue().isEmpty()) {
+ includeMetric = true;
+ }
+ for (Pattern p : includePatternBroadcast.getValue()) {
+ Matcher m = p.matcher(timelineMetric.getMetricName());
+ if (m.find()) {
+ includeMetric = true;
+ }
+ }
+ }
+
+ if (includeMetric) {
+ trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(),
+ timelineMetric.getHostName()));
+ List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+ metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+ }
+ }
+ });
+ });
+
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
+
+
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java
new file mode 100644
index 0000000..da3999a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+public class MetricsCollectorInterface implements Serializable {
+
+ private static String hostName = null;
+ private String instanceId = null;
+ public final static String serviceName = "anomaly-engine";
+ private String collectorHost;
+ private String protocol;
+ private String port;
+ private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
+ private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class);
+ private static ObjectMapper mapper;
+ private final static ObjectReader timelineObjectReader;
+
+ static {
+ mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ mapper.setAnnotationIntrospector(introspector);
+ mapper.getSerializationConfig()
+ .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ timelineObjectReader = mapper.reader(TimelineMetrics.class);
+ }
+
+ public MetricsCollectorInterface(String collectorHost, String protocol, String port) {
+ this.collectorHost = collectorHost;
+ this.protocol = protocol;
+ this.port = port;
+ this.hostName = getDefaultLocalHostName();
+ }
+
+ public static String getDefaultLocalHostName() {
+
+ if (hostName != null) {
+ return hostName;
+ }
+
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ LOG.info("Error getting host address");
+ }
+ return null;
+ }
+
+ public void publish(List<MetricAnomaly> metricAnomalies) {
+ if (CollectionUtils.isNotEmpty(metricAnomalies)) {
+ LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
+ List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
+ if (!metricList.isEmpty()) {
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(metricList);
+ emitMetrics(timelineMetrics);
+ }
+ } else {
+ LOG.debug("No anomalies to send.");
+ }
+ }
+
+ private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
+ List<TimelineMetric> metrics = new ArrayList<>();
+
+ if (metricAnomalies.isEmpty()) {
+ return metrics;
+ }
+
+ for (MetricAnomaly anomaly : metricAnomalies) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(anomaly.getMetricKey());
+ timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType());
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setHostName(getDefaultLocalHostName());
+ timelineMetric.setStartTime(anomaly.getTimestamp());
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", anomaly.getMethodType());
+ metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore()));
+ timelineMetric.setMetadata(metadata);
+ TreeMap<Long,Double> metricValues = new TreeMap<>();
+ metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue());
+ timelineMetric.setMetricValues(metricValues);
+
+ metrics.add(timelineMetric);
+ }
+ return metrics;
+ }
+
+ public boolean emitMetrics(TimelineMetrics metrics) {
+ String connectUrl = constructTimelineMetricUri();
+ String jsonData = null;
+ LOG.debug("EmitMetrics connectUrl = " + connectUrl);
+ try {
+ jsonData = mapper.writeValueAsString(metrics);
+ LOG.info(jsonData);
+ } catch (IOException e) {
+ LOG.error("Unable to parse metrics", e);
+ }
+ if (jsonData != null) {
+ return emitMetricsJson(connectUrl, jsonData);
+ }
+ return false;
+ }
+
+ private HttpURLConnection getConnection(String spec) throws IOException {
+ return (HttpURLConnection) new URL(spec).openConnection();
+ }
+
+ private boolean emitMetricsJson(String connectUrl, String jsonData) {
+ int timeout = 10000;
+ HttpURLConnection connection = null;
+ try {
+ if (connectUrl == null) {
+ throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+ }
+ connection = getConnection(connectUrl);
+
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Connection", "Keep-Alive");
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ connection.setDoOutput(true);
+
+ if (jsonData != null) {
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonData.getBytes("UTF-8"));
+ }
+ }
+
+ int statusCode = connection.getResponseCode();
+
+ if (statusCode != 200) {
+ LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
+ "statusCode = " + statusCode);
+ } else {
+ LOG.info("Metrics posted to Collector " + connectUrl);
+ }
+ return true;
+ } catch (IOException ioe) {
+ LOG.error(ioe.getMessage());
+ }
+ return false;
+ }
+
+ private String constructTimelineMetricUri() {
+ StringBuilder sb = new StringBuilder(protocol);
+ sb.append("://");
+ sb.append(collectorHost);
+ sb.append(":");
+ sb.append(port);
+ sb.append(WS_V1_TIMELINE_METRICS);
+ return sb.toString();
+ }
+
+ public TimelineMetrics fetchMetrics(String metricName,
+ String appId,
+ String hostname,
+ long startime,
+ long endtime) {
+
+ String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
+ "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
+ LOG.debug("Fetch metrics URL : " + url);
+
+ URL obj = null;
+ BufferedReader in = null;
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+ try {
+ obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+ int responseCode = con.getResponseCode();
+ LOG.debug("Sending 'GET' request to URL : " + url);
+ LOG.debug("Response Code : " + responseCode);
+
+ in = new BufferedReader(
+ new InputStreamReader(con.getInputStream()));
+ timelineMetrics = timelineObjectReader.readValue(in);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ LOG.warn(e);
+ }
+ }
+ }
+
+ LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics.");
+ return timelineMetrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java
new file mode 100644
index 0000000..0a2271a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaModel;
+import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class PointInTimeADSystem implements Serializable {
+
+ //private EmaTechnique emaTechnique;
+ private MetricsCollectorInterface metricsCollectorInterface;
+ private Map<String, Double> tukeysNMap;
+ private double defaultTukeysN = 3;
+
+ private long testIntervalMillis = 5*60*1000; //10mins
+ private long trainIntervalMillis = 15*60*1000; //1hour
+
+ private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class);
+
+ private AmbariServerInterface ambariServerInterface;
+ private int sensitivity = 50;
+ private int minSensitivity = 0;
+ private int maxSensitivity = 100;
+
+ public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
+ long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
+ this.metricsCollectorInterface = metricsCollectorInterface;
+ this.defaultTukeysN = defaultTukeysN;
+ this.tukeysNMap = new HashMap<>();
+ this.testIntervalMillis = testIntervalMillis;
+ this.trainIntervalMillis = trainIntervalMillis;
+ this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName);
+ LOG.info("Starting PointInTimeADSystem...");
+ }
+
+ public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) {
+ LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]");
+
+ int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity();
+ if (requiredSensivity == -1 || requiredSensivity == sensitivity) {
+ LOG.info("No change in sensitivity needed.");
+ } else {
+ LOG.info("Current tukey's N value = " + defaultTukeysN);
+ if (requiredSensivity > sensitivity) {
+ int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
+ while (sensitivity < targetSensitivity) {
+ defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05;
+ sensitivity++;
+ }
+ } else {
+ int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
+ while (sensitivity > targetSensitivity) {
+ defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05;
+ sensitivity--;
+ }
+ }
+ LOG.info("New tukey's N value = " + defaultTukeysN);
+ }
+
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ for (String metricKey : emaTechnique.getTrackedEmas().keySet()) {
+ LOG.info("EMA key = " + metricKey);
+ EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey);
+ String metricName = emaModel.getMetricName();
+ String appId = emaModel.getAppId();
+ String hostname = emaModel.getHostname();
+
+ TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis),
+ startTime);
+
+ if (tukeysData.getMetrics().isEmpty()) {
+ LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey);
+ continue;
+ }
+
+ List<Double> trainTsList = new ArrayList<>();
+ List<Double> trainDataList = new ArrayList<>();
+ List<Double> testTsList = new ArrayList<>();
+ List<Double> testDataList = new ArrayList<>();
+
+ for (TimelineMetric metric : tukeysData.getMetrics()) {
+ for (Long timestamp : metric.getMetricValues().keySet()) {
+ if (timestamp <= (startTime - testIntervalMillis)) {
+ trainDataList.add(metric.getMetricValues().get(timestamp));
+ trainTsList.add((double)timestamp);
+ } else {
+ testDataList.add(metric.getMetricValues().get(timestamp));
+ testTsList.add((double)timestamp);
+ }
+ }
+ }
+
+ if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+ LOG.info("Not enough train/test data to perform analysis.");
+ continue;
+ }
+
+ String tukeysTrainSeries = "tukeysTrainSeries";
+ double[] trainTs = new double[trainTsList.size()];
+ double[] trainData = new double[trainTsList.size()];
+ for (int i = 0; i < trainTs.length; i++) {
+ trainTs[i] = trainTsList.get(i);
+ trainData[i] = trainDataList.get(i);
+ }
+
+ String tukeysTestSeries = "tukeysTestSeries";
+ double[] testTs = new double[testTsList.size()];
+ double[] testData = new double[testTsList.size()];
+ for (int i = 0; i < testTs.length; i++) {
+ testTs[i] = testTsList.get(i);
+ testData[i] = testDataList.get(i);
+ }
+
+ LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+ DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData);
+ DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData);
+
+ if (!tukeysNMap.containsKey(metricKey)) {
+ tukeysNMap.put(metricKey, defaultTukeysN);
+ }
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey)));
+
+ ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs);
+
+ List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname);
+ LOG.info("Tukeys anomalies size : " + tukeysMetrics.size());
+ TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>();
+
+ for (TimelineMetric tukeysMetric : tukeysMetrics) {
+ tukeysMetricValues.putAll(tukeysMetric.getMetricValues());
+ timelineMetrics.addOrMergeTimelineMetric(tukeysMetric);
+ }
+
+ TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime);
+ TreeMap<Long, Double> emaMetricValues = new TreeMap();
+ if (!emaData.getMetrics().isEmpty()) {
+ emaMetricValues = emaData.getMetrics().get(0).getMetricValues();
+ }
+
+ LOG.info("Ema anomalies size : " + emaMetricValues.size());
+ int tp = 0;
+ int tn = 0;
+ int fp = 0;
+ int fn = 0;
+
+ for (double ts : testTs) {
+ long timestamp = (long) ts;
+ if (tukeysMetricValues.containsKey(timestamp)) {
+ if (emaMetricValues.containsKey(timestamp)) {
+ tp++;
+ } else {
+ fn++;
+ }
+ } else {
+ if (emaMetricValues.containsKey(timestamp)) {
+ fp++;
+ } else {
+ tn++;
+ }
+ }
+ }
+
+ double recall = (double) tp / (double) (tp + fn);
+ double precision = (double) tp / (double) (tp + fp);
+ LOG.info("----------------------------");
+ LOG.info("Precision Recall values for " + metricKey);
+ LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn);
+ LOG.info("----------------------------");
+
+ if (recall < 0.5) {
+ LOG.info("Increasing EMA sensitivity by 10%");
+ emaModel.updateModel(true, 5);
+ } else if (precision < 0.5) {
+ LOG.info("Decreasing EMA sensitivity by 10%");
+ emaModel.updateModel(false, 5);
+ }
+
+ }
+
+ if (emaTechnique.getTrackedEmas().isEmpty()){
+ LOG.info("No EMA Technique keys tracked!!!!");
+ }
+
+ if (!timelineMetrics.getMetrics().isEmpty()) {
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+ }
+ }
+
+ private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) {
+
+ List<TimelineMetric> timelineMetrics = new ArrayList<>();
+
+ if (result == null) {
+ LOG.info("ResultSet from R call is null!!");
+ return null;
+ }
+
+ if (result.resultset.size() > 0) {
+ double[] ts = result.resultset.get(0);
+ double[] metrics = result.resultset.get(1);
+ double[] anomalyScore = result.resultset.get(2);
+ for (int i = 0; i < ts.length; i++) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname);
+ timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+ timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setStartTime((long) ts[i]);
+ TreeMap<Long, Double> metricValues = new TreeMap<>();
+ metricValues.put((long) ts[i], metrics[i]);
+
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", "tukeys");
+ if (String.valueOf(anomalyScore[i]).equals("infinity")) {
+ LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname);
+ } else {
+ metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+ }
+ timelineMetric.setMetadata(metadata);
+
+ timelineMetric.setMetricValues(metricValues);
+ timelineMetrics.add(timelineMetric);
+ }
+ }
+
+ return timelineMetrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java
new file mode 100644
index 0000000..8f1eba6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RFunctionInvoker {
+
+ static final Log LOG = LogFactory.getLog(RFunctionInvoker.class);
+ public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+ private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts";
+
+ private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) {
+ r.assign("train_ts", trainData.ts);
+ r.assign("train_x", trainData.values);
+ r.eval("train_data <- data.frame(train_ts,train_x)");
+ r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")");
+
+ r.assign("test_ts", testData.ts);
+ r.assign("test_x", testData.values);
+ r.eval("test_data <- data.frame(test_ts,test_x)");
+ r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")");
+ }
+
+ public static void setScriptsDir(String dir) {
+ rScriptDir = dir;
+ }
+
+ public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+
+ ResultSet result;
+ switch (methodType) {
+ case "tukeys":
+ result = tukeys(trainData, testData, configs);
+ break;
+ case "ema":
+ result = ema_global(trainData, testData, configs);
+ break;
+ case "ks":
+ result = ksTest(trainData, testData, configs);
+ break;
+ case "hsdev":
+ result = hsdev(trainData, testData, configs);
+ break;
+ default:
+ result = tukeys(trainData, testData, configs);
+ break;
+ }
+ return result;
+ }
+
+ public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+
+ REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')");
+
+ double n = Double.parseDouble(configs.get("tukeys.n"));
+ r.eval("n <- " + n);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ams_tukeys(train_data, test_data, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+ int n = Integer.parseInt(configs.get("ema.n"));
+ r.eval("n <- " + n);
+
+ double w = Double.parseDouble(configs.get("ema.w"));
+ r.eval("w <- " + w);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ema_global(train_data, test_data, w, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+ int n = Integer.parseInt(configs.get("ema.n"));
+ r.eval("n <- " + n);
+
+ double w = Double.parseDouble(configs.get("ema.w"));
+ r.eval("w <- " + w);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ema_daily(train_data, test_data, w, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/kstest.r" + "')");
+
+ double p_value = Double.parseDouble(configs.get("ks.p_value"));
+ r.eval("p_value <- " + p_value);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ams_ks(train_data, test_data, p_value)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/hsdev.r" + "')");
+
+ int n = Integer.parseInt(configs.get("hsdev.n"));
+ r.eval("n <- " + n);
+
+ int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
+ r.eval("nhp <- " + nhp);
+
+ long interval = Long.parseLong(configs.get("hsdev.interval"));
+ r.eval("interval <- " + interval);
+
+ long period = Long.parseLong(configs.get("hsdev.period"));
+ r.eval("period <- " + period);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
+ REXP exp = r.eval("an2");
+ RVector cont = (RVector) exp.getContent();
+
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java
new file mode 100644
index 0000000..f5ec83a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.adservice.prototype.methods.hsdev.HsdevTechnique;
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.methods.kstest.KSTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class TrendADSystem implements Serializable {
+
+ private MetricsCollectorInterface metricsCollectorInterface;
+ private List<TrendMetric> trendMetrics;
+
+ private long ksTestIntervalMillis = 10 * 60 * 1000;
+ private long ksTrainIntervalMillis = 10 * 60 * 1000;
+ private KSTechnique ksTechnique;
+
+ private HsdevTechnique hsdevTechnique;
+ private int hsdevNumHistoricalPeriods = 3;
+
+ private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>();
+ private static final Log LOG = LogFactory.getLog(TrendADSystem.class);
+ private String inputFile = "";
+
+ public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
+ long ksTestIntervalMillis,
+ long ksTrainIntervalMillis,
+ int hsdevNumHistoricalPeriods) {
+
+ this.metricsCollectorInterface = metricsCollectorInterface;
+ this.ksTestIntervalMillis = ksTestIntervalMillis;
+ this.ksTrainIntervalMillis = ksTrainIntervalMillis;
+ this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods;
+
+ this.ksTechnique = new KSTechnique();
+ this.hsdevTechnique = new HsdevTechnique();
+
+ trendMetrics = new ArrayList<>();
+ }
+
+ public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) {
+ readInputFile(inputFile);
+
+ long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
+ LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " +
+ new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis)
+ + " : " + new Date(ksTestIntervalStartTime) + "]");
+
+ for (TrendMetric metric : trendMetrics) {
+ String metricName = metric.metricName;
+ String appId = metric.appId;
+ String hostname = metric.hostname;
+ String key = metricName + ":" + appId + ":" + hostname;
+
+ TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis,
+ currentEndTime);
+
+ if (ksData.getMetrics().isEmpty()) {
+ LOG.info("No metrics fetched for KS, metricKey = " + key);
+ continue;
+ }
+
+ List<Double> trainTsList = new ArrayList<>();
+ List<Double> trainDataList = new ArrayList<>();
+ List<Double> testTsList = new ArrayList<>();
+ List<Double> testDataList = new ArrayList<>();
+
+ for (TimelineMetric timelineMetric : ksData.getMetrics()) {
+ for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+ if (timestamp <= ksTestIntervalStartTime) {
+ trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ trainTsList.add((double) timestamp);
+ } else {
+ testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ testTsList.add((double) timestamp);
+ }
+ }
+ }
+
+ LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size());
+ if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+ LOG.info("Not enough train/test data to perform KS analysis.");
+ continue;
+ }
+
+ String ksTrainSeries = "KSTrainSeries";
+ double[] trainTs = new double[trainTsList.size()];
+ double[] trainData = new double[trainTsList.size()];
+ for (int i = 0; i < trainTs.length; i++) {
+ trainTs[i] = trainTsList.get(i);
+ trainData[i] = trainDataList.get(i);
+ }
+
+ String ksTestSeries = "KSTestSeries";
+ double[] testTs = new double[testTsList.size()];
+ double[] testData = new double[testTsList.size()];
+ for (int i = 0; i < testTs.length; i++) {
+ testTs[i] = testTsList.get(i);
+ testData[i] = testDataList.get(i);
+ }
+
+ LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+ DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData);
+ DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData);
+
+ MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData);
+ if (metricAnomaly == null) {
+ LOG.info("No anomaly from KS test.");
+ } else {
+ LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric....");
+ TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly,
+ ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+
+ trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly);
+ }
+ }
+
+ if (trendMetrics.isEmpty()) {
+ LOG.info("No Trend metrics tracked!!!!");
+ }
+
+ }
+
+ private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly,
+ long testStart,
+ long testEnd,
+ long trainStart,
+ long trainEnd) {
+
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricAnomaly.getMetricKey());
+ timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType());
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+ timelineMetric.setStartTime(testEnd);
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", metricAnomaly.getMethodType());
+ metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore()));
+ metadata.put("test-start-time", String.valueOf(testStart));
+ metadata.put("train-start-time", String.valueOf(trainStart));
+ metadata.put("train-end-time", String.valueOf(trainEnd));
+ timelineMetric.setMetadata(metadata);
+ TreeMap<Long,Double> metricValues = new TreeMap<>();
+ metricValues.put(testEnd, metricAnomaly.getMetricValue());
+ timelineMetric.setMetricValues(metricValues);
+ return timelineMetric;
+
+ }
+
+ public void runHsdevMethod() {
+
+ List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
+
+ for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) {
+
+ long hsdevTestEnd = ksSingleRunKey.endTime;
+ long hsdevTestStart = ksSingleRunKey.startTime;
+
+ long period = hsdevTestEnd - hsdevTestStart;
+
+ long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period;
+ long hsdevTrainEnd = hsdevTestStart;
+
+ LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " +
+ new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart)
+ + " : " + new Date(hsdevTrainEnd) + "]");
+
+ String metricName = ksSingleRunKey.metricName;
+ String appId = ksSingleRunKey.appId;
+ String hostname = ksSingleRunKey.hostname;
+ String key = metricName + "_" + appId + "_" + hostname;
+
+ TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics(
+ metricName,
+ appId,
+ hostname,
+ hsdevTrainStart,
+ hsdevTestEnd);
+
+ if (hsdevData.getMetrics().isEmpty()) {
+ LOG.info("No metrics fetched for HSDev, metricKey = " + key);
+ continue;
+ }
+
+ List<Double> trainTsList = new ArrayList<>();
+ List<Double> trainDataList = new ArrayList<>();
+ List<Double> testTsList = new ArrayList<>();
+ List<Double> testDataList = new ArrayList<>();
+
+ for (TimelineMetric timelineMetric : hsdevData.getMetrics()) {
+ for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+ if (timestamp <= hsdevTestStart) {
+ trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ trainTsList.add((double) timestamp);
+ } else {
+ testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ testTsList.add((double) timestamp);
+ }
+ }
+ }
+
+ if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+ LOG.info("Not enough train/test data to perform Hsdev analysis.");
+ continue;
+ }
+
+ String hsdevTrainSeries = "HsdevTrainSeries";
+ double[] trainTs = new double[trainTsList.size()];
+ double[] trainData = new double[trainTsList.size()];
+ for (int i = 0; i < trainTs.length; i++) {
+ trainTs[i] = trainTsList.get(i);
+ trainData[i] = trainDataList.get(i);
+ }
+
+ String hsdevTestSeries = "HsdevTestSeries";
+ double[] testTs = new double[testTsList.size()];
+ double[] testData = new double[testTsList.size()];
+ for (int i = 0; i < testTs.length; i++) {
+ testTs[i] = testTsList.get(i);
+ testData[i] = testDataList.get(i);
+ }
+
+ LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+ DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData);
+ DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData);
+
+ MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData);
+ if (metricAnomaly == null) {
+ LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. ");
+ ksTechnique.updateModel(key, false, 10);
+ } else {
+ LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly.");
+ hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly,
+ hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd));
+ }
+ }
+ clearTrackedKsRunKeys();
+
+ if (!hsdevMetricAnomalies.isEmpty()) {
+ LOG.info("Publishing Hsdev Anomalies....");
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(hsdevMetricAnomalies);
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+ }
+ }
+
+ private void clearTrackedKsRunKeys() {
+ trackedKsAnomalies.clear();
+ }
+
+ private void readInputFile(String fileName) {
+ trendMetrics.clear();
+ try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
+ for (String line; (line = br.readLine()) != null; ) {
+ String[] splits = line.split(",");
+ LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
+ trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
+ }
+ } catch (IOException e) {
+ LOG.error("Error reading input file : " + e);
+ }
+ }
+
+ class KsSingleRunKey implements Serializable{
+
+ long startTime;
+ long endTime;
+ String metricName;
+ String appId;
+ String hostname;
+
+ public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.metricName = metricName;
+ this.appId = appId;
+ this.hostname = hostname;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java
new file mode 100644
index 0000000..d4db227
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import java.io.Serializable;
+
+public class TrendMetric implements Serializable {
+
+ String metricName;
+ String appId;
+ String hostname;
+
+ public TrendMetric(String metricName, String appId, String hostname) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.hostname = hostname;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java
new file mode 100644
index 0000000..c19adda
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.methods;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.util.List;
+
+public abstract class AnomalyDetectionTechnique {
+
+ protected String methodType;
+
+ public abstract List<MetricAnomaly> test(TimelineMetric metric);
+
+}