You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/09/27 22:03:12 UTC

[6/6] ambari git commit: AMBARI-22077 : Create maven module and package structure for the anomaly detection engine. (Commit 2) (avijayan)

AMBARI-22077 : Create maven module and package structure for the anomaly detection engine. (Commit 2) (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4613b471
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4613b471
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4613b471

Branch: refs/heads/branch-3.0-ams
Commit: 4613b471e20257df7f1f732e9444d8a90c71d743
Parents: e33b545
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Wed Sep 27 15:02:56 2017 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Wed Sep 27 15:02:56 2017 -0700

----------------------------------------------------------------------
 .../pom.xml                                     | 205 ++++++++++
 .../adservice/prototype/common/DataSeries.java  |  38 ++
 .../adservice/prototype/common/ResultSet.java   |  43 +++
 .../prototype/common/StatisticUtils.java        |  62 +++
 .../prototype/core/AmbariServerInterface.java   | 121 ++++++
 .../prototype/core/MetricKafkaProducer.java     |  56 +++
 .../prototype/core/MetricSparkConsumer.java     | 239 ++++++++++++
 .../core/MetricsCollectorInterface.java         | 237 ++++++++++++
 .../prototype/core/PointInTimeADSystem.java     | 260 +++++++++++++
 .../prototype/core/RFunctionInvoker.java        | 222 +++++++++++
 .../adservice/prototype/core/TrendADSystem.java | 317 ++++++++++++++++
 .../adservice/prototype/core/TrendMetric.java   |  33 ++
 .../methods/AnomalyDetectionTechnique.java      |  30 ++
 .../prototype/methods/MetricAnomaly.java        |  86 +++++
 .../prototype/methods/ema/EmaModel.java         | 131 +++++++
 .../prototype/methods/ema/EmaModelLoader.java   |  40 ++
 .../prototype/methods/ema/EmaTechnique.java     | 151 ++++++++
 .../prototype/methods/hsdev/HsdevTechnique.java |  81 ++++
 .../prototype/methods/kstest/KSTechnique.java   | 101 +++++
 .../MetricAnomalyDetectorTestInput.java         | 126 ++++++
 .../testing/utilities/MetricAnomalyTester.java  | 168 ++++++++
 .../utilities/TestMetricSeriesGenerator.java    |  92 +++++
 .../utilities/TestSeriesInputRequest.java       |  88 +++++
 .../src/main/resources/R-scripts/ema.R          |  96 +++++
 .../src/main/resources/R-scripts/hsdev.r        |  67 ++++
 .../src/main/resources/R-scripts/iforest.R      |  52 +++
 .../src/main/resources/R-scripts/kstest.r       |  38 ++
 .../src/main/resources/R-scripts/test.R         |  85 +++++
 .../src/main/resources/R-scripts/tukeys.r       |  51 +++
 .../src/main/resources/input-config.properties  |  42 ++
 .../spark/prototype/MetricAnomalyDetector.scala | 126 ++++++
 .../spark/prototype/SparkPhoenixReader.scala    |  78 ++++
 .../adservice/prototype/TestEmaTechnique.java   | 106 ++++++
 .../prototype/TestRFunctionInvoker.java         | 161 ++++++++
 .../metrics/adservice/prototype/TestTukeys.java | 100 +++++
 .../seriesgenerator/AbstractMetricSeries.java   |  25 ++
 .../seriesgenerator/DualBandMetricSeries.java   |  88 +++++
 .../MetricSeriesGeneratorFactory.java           | 377 ++++++++++++++++++
 .../MetricSeriesGeneratorTest.java              | 101 +++++
 .../seriesgenerator/MonotonicMetricSeries.java  | 101 +++++
 .../seriesgenerator/NormalMetricSeries.java     |  81 ++++
 .../SteadyWithTurbulenceMetricSeries.java       | 115 ++++++
 .../StepFunctionMetricSeries.java               | 107 ++++++
 .../seriesgenerator/UniformMetricSeries.java    |  95 +++++
 .../ambari-metrics-anomaly-detector/pom.xml     | 205 ----------
 .../prototype/common/DataSeries.java            |  38 --
 .../prototype/common/ResultSet.java             |  43 ---
 .../prototype/common/StatisticUtils.java        |  62 ---
 .../prototype/core/AmbariServerInterface.java   | 121 ------
 .../prototype/core/MetricKafkaProducer.java     |  56 ---
 .../prototype/core/MetricSparkConsumer.java     | 239 ------------
 .../core/MetricsCollectorInterface.java         | 237 ------------
 .../prototype/core/PointInTimeADSystem.java     | 260 -------------
 .../prototype/core/RFunctionInvoker.java        | 222 -----------
 .../prototype/core/TrendADSystem.java           | 317 ----------------
 .../prototype/core/TrendMetric.java             |  33 --
 .../methods/AnomalyDetectionTechnique.java      |  32 --
 .../prototype/methods/MetricAnomaly.java        |  86 -----
 .../prototype/methods/ema/EmaModel.java         | 131 -------
 .../prototype/methods/ema/EmaModelLoader.java   |  46 ---
 .../prototype/methods/ema/EmaTechnique.java     | 151 --------
 .../prototype/methods/hsdev/HsdevTechnique.java |  81 ----
 .../prototype/methods/kstest/KSTechnique.java   | 101 -----
 .../MetricAnomalyDetectorTestInput.java         | 126 ------
 .../testing/utilities/MetricAnomalyTester.java  | 166 --------
 .../utilities/TestMetricSeriesGenerator.java    |  92 -----
 .../utilities/TestSeriesInputRequest.java       |  88 -----
 .../seriesgenerator/AbstractMetricSeries.java   |  25 --
 .../seriesgenerator/DualBandMetricSeries.java   |  88 -----
 .../MetricSeriesGeneratorFactory.java           | 379 -------------------
 .../seriesgenerator/MonotonicMetricSeries.java  | 101 -----
 .../seriesgenerator/NormalMetricSeries.java     |  81 ----
 .../SteadyWithTurbulenceMetricSeries.java       | 115 ------
 .../StepFunctionMetricSeries.java               | 107 ------
 .../seriesgenerator/UniformMetricSeries.java    |  95 -----
 .../src/main/resources/R-scripts/ema.R          |  96 -----
 .../src/main/resources/R-scripts/hsdev.r        |  67 ----
 .../src/main/resources/R-scripts/iforest.R      |  52 ---
 .../src/main/resources/R-scripts/kstest.r       |  38 --
 .../src/main/resources/R-scripts/test.R         |  85 -----
 .../src/main/resources/R-scripts/tukeys.r       |  51 ---
 .../src/main/resources/input-config.properties  |  42 --
 .../metrics/spark/MetricAnomalyDetector.scala   | 127 -------
 .../metrics/spark/SparkPhoenixReader.scala      |  78 ----
 .../prototype/TestEmaTechnique.java             | 106 ------
 .../prototype/TestRFunctionInvoker.java         | 161 --------
 .../alertservice/prototype/TestTukeys.java      | 100 -----
 .../MetricSeriesGeneratorTest.java              | 101 -----
 ambari-metrics/pom.xml                          |   2 +-
 89 files changed, 5020 insertions(+), 5029 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
new file mode 100644
index 0000000..1a10f86
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -0,0 +1,205 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>ambari-metrics</artifactId>
+        <groupId>org.apache.ambari</groupId>
+        <version>2.0.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>ambari-metrics-anomaly-detection-service</artifactId>
+    <version>2.0.0.0-SNAPSHOT</version>
+    <properties>
+        <scala.version>2.10.4</scala.version>
+        <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>scala-tools.org</id>
+            <name>Scala-Tools Maven2 Repository</name>
+            <url>http://scala-tools.org/repo-releases</url>
+        </repository>
+    </repositories>
+
+    <pluginRepositories>
+        <pluginRepository>
+            <id>scala-tools.org</id>
+            <name>Scala-Tools Maven2 Repository</name>
+            <url>http://scala-tools.org/repo-releases</url>
+        </pluginRepository>
+    </pluginRepositories>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <arg>-target:jvm-1.5</arg>
+                    </args>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <name>Ambari Metrics Anomaly Detection Service</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>2.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.lucarosellini.rJava</groupId>
+            <artifactId>JRI</artifactId>
+            <version>0.9-7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.11</artifactId>
+            <version>2.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.10.1.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.mail</groupId>
+                    <artifactId>mail</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jmx</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-json</artifactId>
+            <version>0.10.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-kafka_2.10</artifactId>
+            <version>1.6.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <version>1.6.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.phoenix</groupId>
+            <artifactId>phoenix-spark</artifactId>
+            <version>4.10.0-HBase-1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_2.10</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+            <version>4.10</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ambari</groupId>
+            <artifactId>ambari-metrics-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>2.1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
+            <version>2.1.1</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java
new file mode 100644
index 0000000..54b402f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/DataSeries.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.common;
+
+import java.util.Arrays;
+
+public class DataSeries {
+
+    public String seriesName;
+    public double[] ts;
+    public double[] values;
+
+    public DataSeries(String seriesName, double[] ts, double[] values) {
+        this.seriesName = seriesName;
+        this.ts = ts;
+        this.values = values;
+    }
+
+    @Override
+    public String toString() {
+        return seriesName + Arrays.toString(ts) + Arrays.toString(values);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java
new file mode 100644
index 0000000..dd3038f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/ResultSet.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.common;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ResultSet {
+
+    public List<double[]> resultset = new ArrayList<>();
+
+    public ResultSet(List<double[]> resultset) {
+        this.resultset = resultset;
+    }
+
+    public void print() {
+        System.out.println("Result : ");
+        if (!resultset.isEmpty()) {
+            for (int i = 0; i<resultset.get(0).length;i++) {
+                for (double[] entity : resultset) {
+                    System.out.print(entity[i] + " ");
+                }
+                System.out.println();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java
new file mode 100644
index 0000000..7f0aed3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/common/StatisticUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.common;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public class StatisticUtils {
+
+  public static double mean(double[] values) {
+    double sum = 0;
+    for (double d : values) {
+      sum += d;
+    }
+    return sum / values.length;
+  }
+
+  public static double variance(double[] values) {
+    double avg =  mean(values);
+    double variance = 0;
+    for (double d : values) {
+      variance += Math.pow(d - avg, 2.0);
+    }
+    return variance;
+  }
+
+  public static double sdev(double[]  values, boolean useBesselsCorrection) {
+    double variance = variance(values);
+    int n = (useBesselsCorrection) ? values.length - 1 : values.length;
+    return Math.sqrt(variance / n);
+  }
+
+  public static double median(double[] values) {
+    double[] clonedValues = Arrays.copyOf(values, values.length);
+    Arrays.sort(clonedValues);
+    int n = values.length;
+
+    if (n % 2 != 0) {
+      return clonedValues[(n-1)/2];
+    } else {
+      return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java
new file mode 100644
index 0000000..920d758
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/AmbariServerInterface.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+public class AmbariServerInterface implements Serializable{
+
+  private static final Log LOG = LogFactory.getLog(AmbariServerInterface.class);
+
+  private String ambariServerHost;
+  private String clusterName;
+
+  public AmbariServerInterface(String ambariServerHost, String clusterName) {
+    this.ambariServerHost = ambariServerHost;
+    this.clusterName = clusterName;
+  }
+
+  public int getPointInTimeSensitivity() {
+
+    String url = constructUri("http", ambariServerHost, "8080", "/api/v1/clusters/" + clusterName + "/alert_definitions?fields=*");
+
+    URL obj = null;
+    BufferedReader in = null;
+
+    try {
+      obj = new URL(url);
+      HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+      con.setRequestMethod("GET");
+
+      String encoded = Base64.getEncoder().encodeToString(("admin:admin").getBytes(StandardCharsets.UTF_8));
+      con.setRequestProperty("Authorization", "Basic "+encoded);
+
+      int responseCode = con.getResponseCode();
+      LOG.info("Sending 'GET' request to URL : " + url);
+      LOG.info("Response Code : " + responseCode);
+
+      in = new BufferedReader(
+        new InputStreamReader(con.getInputStream()));
+
+      StringBuilder responseJsonSb = new StringBuilder();
+      String line;
+      while ((line = in.readLine()) != null) {
+        responseJsonSb.append(line);
+      }
+
+      JSONObject jsonObject = new JSONObject(responseJsonSb.toString());
+      JSONArray array = jsonObject.getJSONArray("items");
+      for(int i = 0 ; i < array.length() ; i++){
+        JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition");
+        if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) {
+          JSONObject sourceNode = alertDefn.getJSONObject("source");
+          JSONArray params = sourceNode.getJSONArray("parameters");
+          for(int j = 0 ; j < params.length() ; j++){
+            JSONObject param = params.getJSONObject(j);
+            if (param.get("name").equals("sensitivity")) {
+              return param.getInt("value");
+            }
+          }
+          break;
+        }
+      }
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      if (in != null) {
+        try {
+          in.close();
+        } catch (IOException e) {
+          LOG.warn(e);
+        }
+      }
+    }
+
+    return -1;
+  }
+
+  private String constructUri(String protocol, String host, String port, String path) {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append(path);
+    return sb.toString();
+  }
+
+//  public static void main(String[] args) {
+//    AmbariServerInterface ambariServerInterface = new AmbariServerInterface();
+//    ambariServerInterface.getPointInTimeSensitivity("avijayan-ams-1.openstacklocal","c1");
+//  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java
new file mode 100644
index 0000000..167fbb3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricKafkaProducer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class MetricKafkaProducer {
+
+    Producer producer;
+    private static String topicName = "ambari-metrics-topic";
+
+    public MetricKafkaProducer(String kafkaServers) {
+        Properties configProperties = new Properties();
+        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667"
+        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+        producer = new KafkaProducer(configProperties);
+    }
+
+    public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException {
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics);
+        ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode);
+        Future<RecordMetadata> kafkaFuture =  producer.send(rec);
+
+        System.out.println(kafkaFuture.isDone());
+        System.out.println(kafkaFuture.get().topic());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
new file mode 100644
index 0000000..e8257e5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricSparkConsumer.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import scala.Tuple2;
+
+import java.util.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class MetricSparkConsumer {
+
+  private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class);
+  private static String groupId = "ambari-metrics-group";
+  private static String topicName = "ambari-metrics-topic";
+  private static int numThreads = 1;
+  private static long pitStartTime = System.currentTimeMillis();
+  private static long ksStartTime = pitStartTime;
+  private static long hdevStartTime = ksStartTime;
+  private static Set<Pattern> includeMetricPatterns = new HashSet<>();
+  private static Set<String> includedHosts = new HashSet<>();
+  private static Set<TrendMetric> trendMetrics = new HashSet<>();
+
+  public MetricSparkConsumer() {
+  }
+
+  public static Properties readProperties(String propertiesFile) {
+    try {
+      Properties properties = new Properties();
+      InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile);
+      if (inputStream == null) {
+        inputStream = new FileInputStream(propertiesFile);
+      }
+      properties.load(inputStream);
+      return properties;
+    } catch (IOException ioEx) {
+      LOG.error("Error reading properties file for jmeter");
+      return null;
+    }
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+
+    if (args.length < 1) {
+      System.err.println("Usage: MetricSparkConsumer <input-config-file>");
+      System.exit(1);
+    }
+
+    Properties properties = readProperties(args[0]);
+
+    List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(","));
+
+    String collectorHost = properties.getProperty("collectorHost");
+    String collectorPort = properties.getProperty("collectorPort");
+    String collectorProtocol = properties.getProperty("collectorProtocol");
+
+    String zkQuorum = properties.getProperty("zkQuorum");
+
+    double emaW = Double.parseDouble(properties.getProperty("emaW"));
+    double emaN = Double.parseDouble(properties.getProperty("emaN"));
+    int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold"));
+    double tukeysN = Double.parseDouble(properties.getProperty("tukeysN"));
+
+    long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval"));
+    long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval"));
+
+    long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval"));
+    long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval"));
+    int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp"));
+    long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval"));
+
+    String ambariServerHost = properties.getProperty("ambariServerHost");
+    String clusterName = properties.getProperty("clusterName");
+
+    String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns");
+    if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) {
+      String[] patterns = includeMetricPatternStrings.split(",");
+      for (String p : patterns) {
+        LOG.info("Included Pattern : " + p);
+        includeMetricPatterns.add(Pattern.compile(p));
+      }
+    }
+
+    String includedHostList = properties.getProperty("hosts");
+    if (includedHostList != null && !includedHostList.isEmpty()) {
+      String[] hosts = includedHostList.split(",");
+      includedHosts.addAll(Arrays.asList(hosts));
+    }
+
+    MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
+
+    SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector");
+
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
+
+    EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold);
+    PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface,
+      tukeysN,
+      pitTestInterval,
+      pitTrainInterval,
+      ambariServerHost,
+      clusterName);
+
+    TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
+      ksTestInterval,
+      ksTrainInterval,
+      hsdevNhp);
+
+    Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique);
+    Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem);
+    Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem);
+    Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface);
+    Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns);
+    Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts);
+
+    JavaPairReceiverInputDStream<String, String> messages =
+      KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
+
+    //Convert JSON string to TimelineMetrics.
+    JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
+      @Override
+      public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
+        return metrics;
+      }
+    });
+
+    timelineMetricsStream.print();
+
+    //Group TimelineMetric by AppId.
+    JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
+      timelineMetrics -> timelineMetrics.getMetrics().isEmpty()  ?  new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics)
+    );
+
+    appMetricStream.print();
+
+    //Filter AppIds that are not needed.
+    JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
+      @Override
+      public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
+        return appIds.contains(appMetricTuple._1);
+      }
+    });
+
+    filteredAppMetricStream.print();
+
+    filteredAppMetricStream.foreachRDD(rdd -> {
+      rdd.foreach(
+        tuple2 -> {
+          long currentTime = System.currentTimeMillis();
+          EmaTechnique ema = emaTechniqueBroadcast.getValue();
+          if (currentTime > pitStartTime + pitTestInterval) {
+            LOG.info("Running Tukeys....");
+            pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime);
+            pitStartTime = pitStartTime + pitTestInterval;
+          }
+
+          if (currentTime > ksStartTime + ksTestInterval) {
+            LOG.info("Running KS Test....");
+            trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics);
+            ksStartTime = ksStartTime + ksTestInterval;
+          }
+
+          if (currentTime > hdevStartTime + hsdevInterval) {
+            LOG.info("Running HSdev Test....");
+            trendADSystemBroadcast.getValue().runHsdevMethod();
+            hdevStartTime = hdevStartTime + hsdevInterval;
+          }
+
+          TimelineMetrics metrics = tuple2._2();
+          for (TimelineMetric timelineMetric : metrics.getMetrics()) {
+
+            boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
+            boolean includeMetric = false;
+            if (includeHost) {
+              if (includePatternBroadcast.getValue().isEmpty()) {
+                includeMetric = true;
+              }
+              for (Pattern p : includePatternBroadcast.getValue()) {
+                Matcher m = p.matcher(timelineMetric.getMetricName());
+                if (m.find()) {
+                  includeMetric = true;
+                }
+              }
+            }
+
+            if (includeMetric) {
+              trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(),
+                timelineMetric.getHostName()));
+              List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+              metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+            }
+          }
+        });
+    });
+
+    jssc.start();
+    jssc.awaitTermination();
+  }
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java
new file mode 100644
index 0000000..da3999a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/MetricsCollectorInterface.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+public class MetricsCollectorInterface implements Serializable {
+
+  private static String hostName = null;
+  private String instanceId = null;
+  public final static String serviceName = "anomaly-engine";
+  private String collectorHost;
+  private String protocol;
+  private String port;
+  private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
+  private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class);
+  private static ObjectMapper mapper;
+  private final static ObjectReader timelineObjectReader;
+
+  static {
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.getSerializationConfig()
+      .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+    timelineObjectReader = mapper.reader(TimelineMetrics.class);
+  }
+
+  public MetricsCollectorInterface(String collectorHost, String protocol, String port) {
+    this.collectorHost = collectorHost;
+    this.protocol = protocol;
+    this.port = port;
+    this.hostName = getDefaultLocalHostName();
+  }
+
+  public static String getDefaultLocalHostName() {
+
+    if (hostName != null) {
+      return hostName;
+    }
+
+    try {
+      return InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException e) {
+      LOG.info("Error getting host address");
+    }
+    return null;
+  }
+
+  public void publish(List<MetricAnomaly> metricAnomalies) {
+    if (CollectionUtils.isNotEmpty(metricAnomalies)) {
+      LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
+      List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
+      if (!metricList.isEmpty()) {
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.setMetrics(metricList);
+        emitMetrics(timelineMetrics);
+      }
+    } else {
+      LOG.debug("No anomalies to send.");
+    }
+  }
+
+  private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
+    List<TimelineMetric> metrics = new ArrayList<>();
+
+    if (metricAnomalies.isEmpty()) {
+      return metrics;
+    }
+
+    for (MetricAnomaly anomaly : metricAnomalies) {
+      TimelineMetric timelineMetric = new TimelineMetric();
+      timelineMetric.setMetricName(anomaly.getMetricKey());
+      timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType());
+      timelineMetric.setInstanceId(null);
+      timelineMetric.setHostName(getDefaultLocalHostName());
+      timelineMetric.setStartTime(anomaly.getTimestamp());
+      HashMap<String, String> metadata = new HashMap<>();
+      metadata.put("method", anomaly.getMethodType());
+      metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore()));
+      timelineMetric.setMetadata(metadata);
+      TreeMap<Long,Double> metricValues = new TreeMap<>();
+      metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue());
+      timelineMetric.setMetricValues(metricValues);
+
+      metrics.add(timelineMetric);
+    }
+    return metrics;
+  }
+
+  public boolean emitMetrics(TimelineMetrics metrics) {
+    String connectUrl = constructTimelineMetricUri();
+    String jsonData = null;
+    LOG.debug("EmitMetrics connectUrl = " + connectUrl);
+    try {
+      jsonData = mapper.writeValueAsString(metrics);
+      LOG.info(jsonData);
+    } catch (IOException e) {
+      LOG.error("Unable to parse metrics", e);
+    }
+    if (jsonData != null) {
+      return emitMetricsJson(connectUrl, jsonData);
+    }
+    return false;
+  }
+
+  private HttpURLConnection getConnection(String spec) throws IOException {
+    return (HttpURLConnection) new URL(spec).openConnection();
+  }
+
+  private boolean emitMetricsJson(String connectUrl, String jsonData) {
+    int timeout = 10000;
+    HttpURLConnection connection = null;
+    try {
+      if (connectUrl == null) {
+        throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+      }
+      connection = getConnection(connectUrl);
+
+      connection.setRequestMethod("POST");
+      connection.setRequestProperty("Content-Type", "application/json");
+      connection.setRequestProperty("Connection", "Keep-Alive");
+      connection.setConnectTimeout(timeout);
+      connection.setReadTimeout(timeout);
+      connection.setDoOutput(true);
+
+      if (jsonData != null) {
+        try (OutputStream os = connection.getOutputStream()) {
+          os.write(jsonData.getBytes("UTF-8"));
+        }
+      }
+
+      int statusCode = connection.getResponseCode();
+
+      if (statusCode != 200) {
+        LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
+          "statusCode = " + statusCode);
+      } else {
+        LOG.info("Metrics posted to Collector " + connectUrl);
+      }
+      return true;
+    } catch (IOException ioe) {
+      LOG.error(ioe.getMessage());
+    }
+    return false;
+  }
+
+  private String constructTimelineMetricUri() {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(collectorHost);
+    sb.append(":");
+    sb.append(port);
+    sb.append(WS_V1_TIMELINE_METRICS);
+    return sb.toString();
+  }
+
+  public TimelineMetrics fetchMetrics(String metricName,
+                                      String appId,
+                                      String hostname,
+                                      long startime,
+                                      long endtime) {
+
+    String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
+      "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
+    LOG.debug("Fetch metrics URL : " + url);
+
+    URL obj = null;
+    BufferedReader in = null;
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+    try {
+      obj = new URL(url);
+      HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+      con.setRequestMethod("GET");
+      int responseCode = con.getResponseCode();
+      LOG.debug("Sending 'GET' request to URL : " + url);
+      LOG.debug("Response Code : " + responseCode);
+
+      in = new BufferedReader(
+        new InputStreamReader(con.getInputStream()));
+      timelineMetrics = timelineObjectReader.readValue(in);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      if (in != null) {
+        try {
+          in.close();
+        } catch (IOException e) {
+          LOG.warn(e);
+        }
+      }
+    }
+
+    LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics.");
+    return timelineMetrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java
new file mode 100644
index 0000000..0a2271a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/PointInTimeADSystem.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaModel;
+import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class PointInTimeADSystem implements Serializable {
+
+  //private EmaTechnique emaTechnique;
+  private MetricsCollectorInterface metricsCollectorInterface;
+  private Map<String, Double> tukeysNMap;
+  private double defaultTukeysN = 3;
+
+  private long testIntervalMillis = 5*60*1000; //10mins
+  private long trainIntervalMillis = 15*60*1000; //1hour
+
+  private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class);
+
+  private AmbariServerInterface ambariServerInterface;
+  private int sensitivity = 50;
+  private int minSensitivity = 0;
+  private int maxSensitivity = 100;
+
+  public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
+                             long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
+    this.metricsCollectorInterface = metricsCollectorInterface;
+    this.defaultTukeysN = defaultTukeysN;
+    this.tukeysNMap = new HashMap<>();
+    this.testIntervalMillis = testIntervalMillis;
+    this.trainIntervalMillis = trainIntervalMillis;
+    this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName);
+    LOG.info("Starting PointInTimeADSystem...");
+  }
+
+  public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) {
+    LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime  - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]");
+
+    int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity();
+    if (requiredSensivity == -1 || requiredSensivity == sensitivity) {
+      LOG.info("No change in sensitivity needed.");
+    } else {
+      LOG.info("Current tukey's N value = " + defaultTukeysN);
+      if (requiredSensivity > sensitivity) {
+        int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
+        while (sensitivity < targetSensitivity) {
+          defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05;
+          sensitivity++;
+        }
+      } else {
+        int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
+        while (sensitivity > targetSensitivity) {
+          defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05;
+          sensitivity--;
+        }
+      }
+      LOG.info("New tukey's N value = " + defaultTukeysN);
+    }
+
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    for (String metricKey : emaTechnique.getTrackedEmas().keySet()) {
+      LOG.info("EMA key = " + metricKey);
+      EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey);
+      String metricName = emaModel.getMetricName();
+      String appId = emaModel.getAppId();
+      String hostname = emaModel.getHostname();
+
+      TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis),
+        startTime);
+
+      if (tukeysData.getMetrics().isEmpty()) {
+        LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey);
+        continue;
+      }
+
+      List<Double> trainTsList = new ArrayList<>();
+      List<Double> trainDataList = new ArrayList<>();
+      List<Double> testTsList = new ArrayList<>();
+      List<Double> testDataList = new ArrayList<>();
+
+      for (TimelineMetric metric : tukeysData.getMetrics()) {
+        for (Long timestamp : metric.getMetricValues().keySet()) {
+          if (timestamp <= (startTime - testIntervalMillis)) {
+            trainDataList.add(metric.getMetricValues().get(timestamp));
+            trainTsList.add((double)timestamp);
+          } else {
+            testDataList.add(metric.getMetricValues().get(timestamp));
+            testTsList.add((double)timestamp);
+          }
+        }
+      }
+
+      if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+        LOG.info("Not enough train/test data to perform analysis.");
+        continue;
+      }
+
+      String tukeysTrainSeries = "tukeysTrainSeries";
+      double[] trainTs = new double[trainTsList.size()];
+      double[] trainData = new double[trainTsList.size()];
+      for (int i = 0; i < trainTs.length; i++) {
+        trainTs[i] = trainTsList.get(i);
+        trainData[i] = trainDataList.get(i);
+      }
+
+      String tukeysTestSeries = "tukeysTestSeries";
+      double[] testTs = new double[testTsList.size()];
+      double[] testData = new double[testTsList.size()];
+      for (int i = 0; i < testTs.length; i++) {
+        testTs[i] = testTsList.get(i);
+        testData[i] = testDataList.get(i);
+      }
+
+      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+      DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData);
+      DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData);
+
+      if (!tukeysNMap.containsKey(metricKey)) {
+        tukeysNMap.put(metricKey, defaultTukeysN);
+      }
+
+      Map<String, String> configs = new HashMap<>();
+      configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey)));
+
+      ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs);
+
+      List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname);
+      LOG.info("Tukeys anomalies size : " + tukeysMetrics.size());
+      TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>();
+
+      for (TimelineMetric tukeysMetric : tukeysMetrics) {
+        tukeysMetricValues.putAll(tukeysMetric.getMetricValues());
+        timelineMetrics.addOrMergeTimelineMetric(tukeysMetric);
+      }
+
+      TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime);
+      TreeMap<Long, Double> emaMetricValues = new TreeMap();
+      if (!emaData.getMetrics().isEmpty()) {
+        emaMetricValues = emaData.getMetrics().get(0).getMetricValues();
+      }
+
+      LOG.info("Ema anomalies size : " + emaMetricValues.size());
+      int tp = 0;
+      int tn = 0;
+      int fp = 0;
+      int fn = 0;
+
+      for (double ts : testTs) {
+        long timestamp = (long) ts;
+        if (tukeysMetricValues.containsKey(timestamp)) {
+          if (emaMetricValues.containsKey(timestamp)) {
+            tp++;
+          } else {
+            fn++;
+          }
+        } else {
+          if (emaMetricValues.containsKey(timestamp)) {
+            fp++;
+          } else {
+            tn++;
+          }
+        }
+      }
+
+      double recall = (double) tp / (double) (tp + fn);
+      double precision = (double) tp / (double) (tp + fp);
+      LOG.info("----------------------------");
+      LOG.info("Precision Recall values for " + metricKey);
+      LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn);
+      LOG.info("----------------------------");
+
+      if (recall < 0.5) {
+        LOG.info("Increasing EMA sensitivity by 10%");
+        emaModel.updateModel(true, 5);
+      } else if (precision < 0.5) {
+        LOG.info("Decreasing EMA sensitivity by 10%");
+        emaModel.updateModel(false, 5);
+      }
+
+    }
+
+    if (emaTechnique.getTrackedEmas().isEmpty()){
+      LOG.info("No EMA Technique keys tracked!!!!");
+    }
+
+    if (!timelineMetrics.getMetrics().isEmpty()) {
+      metricsCollectorInterface.emitMetrics(timelineMetrics);
+    }
+  }
+
+  private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) {
+
+    List<TimelineMetric> timelineMetrics = new ArrayList<>();
+
+    if (result == null) {
+      LOG.info("ResultSet from R call is null!!");
+      return null;
+    }
+
+    if (result.resultset.size() > 0) {
+      double[] ts = result.resultset.get(0);
+      double[] metrics = result.resultset.get(1);
+      double[] anomalyScore = result.resultset.get(2);
+      for (int i = 0; i < ts.length; i++) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname);
+        timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+        timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
+        timelineMetric.setInstanceId(null);
+        timelineMetric.setStartTime((long) ts[i]);
+        TreeMap<Long, Double> metricValues = new TreeMap<>();
+        metricValues.put((long) ts[i], metrics[i]);
+
+        HashMap<String, String> metadata = new HashMap<>();
+        metadata.put("method", "tukeys");
+        if (String.valueOf(anomalyScore[i]).equals("infinity")) {
+          LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname);
+        } else {
+          metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+        }
+        timelineMetric.setMetadata(metadata);
+
+        timelineMetric.setMetricValues(metricValues);
+        timelineMetrics.add(timelineMetric);
+      }
+    }
+
+    return timelineMetrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java
new file mode 100644
index 0000000..8f1eba6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RFunctionInvoker {
+
+  static final Log LOG = LogFactory.getLog(RFunctionInvoker.class);
+  public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+  private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts";
+
+  private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) {
+    r.assign("train_ts", trainData.ts);
+    r.assign("train_x", trainData.values);
+    r.eval("train_data <- data.frame(train_ts,train_x)");
+    r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")");
+
+    r.assign("test_ts", testData.ts);
+    r.assign("test_x", testData.values);
+    r.eval("test_data <- data.frame(test_ts,test_x)");
+    r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")");
+  }
+
+  public static void setScriptsDir(String dir) {
+    rScriptDir = dir;
+  }
+
+  public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+
+    ResultSet result;
+    switch (methodType) {
+      case "tukeys":
+        result = tukeys(trainData, testData, configs);
+        break;
+      case "ema":
+        result = ema_global(trainData, testData, configs);
+        break;
+      case "ks":
+        result = ksTest(trainData, testData, configs);
+        break;
+      case "hsdev":
+        result = hsdev(trainData, testData, configs);
+        break;
+      default:
+        result = tukeys(trainData, testData, configs);
+        break;
+    }
+    return result;
+  }
+
+  public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+
+      REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')");
+
+      double n = Double.parseDouble(configs.get("tukeys.n"));
+      r.eval("n <- " + n);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ams_tukeys(train_data, test_data, n)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+      int n = Integer.parseInt(configs.get("ema.n"));
+      r.eval("n <- " + n);
+
+      double w = Double.parseDouble(configs.get("ema.w"));
+      r.eval("w <- " + w);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ema_global(train_data, test_data, w, n)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+      int n = Integer.parseInt(configs.get("ema.n"));
+      r.eval("n <- " + n);
+
+      double w = Double.parseDouble(configs.get("ema.w"));
+      r.eval("w <- " + w);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ema_daily(train_data, test_data, w, n)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/kstest.r" + "')");
+
+      double p_value = Double.parseDouble(configs.get("ks.p_value"));
+      r.eval("p_value <- " + p_value);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ams_ks(train_data, test_data, p_value)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/hsdev.r" + "')");
+
+      int n = Integer.parseInt(configs.get("hsdev.n"));
+      r.eval("n <- " + n);
+
+      int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
+      r.eval("nhp <- " + nhp);
+
+      long interval = Long.parseLong(configs.get("hsdev.interval"));
+      r.eval("interval <- " + interval);
+
+      long period = Long.parseLong(configs.get("hsdev.period"));
+      r.eval("period <- " + period);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
+      REXP exp = r.eval("an2");
+      RVector cont = (RVector) exp.getContent();
+
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java
new file mode 100644
index 0000000..f5ec83a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.adservice.prototype.methods.hsdev.HsdevTechnique;
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.methods.kstest.KSTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class TrendADSystem implements Serializable {
+
+  private MetricsCollectorInterface metricsCollectorInterface;
+  private List<TrendMetric> trendMetrics;
+
+  private long ksTestIntervalMillis = 10 * 60 * 1000;
+  private long ksTrainIntervalMillis = 10 * 60 * 1000;
+  private KSTechnique ksTechnique;
+
+  private HsdevTechnique hsdevTechnique;
+  private int hsdevNumHistoricalPeriods = 3;
+
+  private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>();
+  private static final Log LOG = LogFactory.getLog(TrendADSystem.class);
+  private String inputFile = "";
+
+  public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
+                       long ksTestIntervalMillis,
+                       long ksTrainIntervalMillis,
+                       int hsdevNumHistoricalPeriods) {
+
+    this.metricsCollectorInterface = metricsCollectorInterface;
+    this.ksTestIntervalMillis = ksTestIntervalMillis;
+    this.ksTrainIntervalMillis = ksTrainIntervalMillis;
+    this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods;
+
+    this.ksTechnique = new KSTechnique();
+    this.hsdevTechnique = new HsdevTechnique();
+
+    trendMetrics = new ArrayList<>();
+  }
+
+  public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) {
+    readInputFile(inputFile);
+
+    long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
+    LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " +
+      new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis)
+      + " : " + new Date(ksTestIntervalStartTime) + "]");
+
+    for (TrendMetric metric : trendMetrics) {
+      String metricName = metric.metricName;
+      String appId = metric.appId;
+      String hostname = metric.hostname;
+      String key = metricName + ":" + appId + ":" + hostname;
+
+      TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis,
+        currentEndTime);
+
+      if (ksData.getMetrics().isEmpty()) {
+        LOG.info("No metrics fetched for KS, metricKey = " + key);
+        continue;
+      }
+
+      List<Double> trainTsList = new ArrayList<>();
+      List<Double> trainDataList = new ArrayList<>();
+      List<Double> testTsList = new ArrayList<>();
+      List<Double> testDataList = new ArrayList<>();
+
+      for (TimelineMetric timelineMetric : ksData.getMetrics()) {
+        for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+          if (timestamp <= ksTestIntervalStartTime) {
+            trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            trainTsList.add((double) timestamp);
+          } else {
+            testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            testTsList.add((double) timestamp);
+          }
+        }
+      }
+
+      LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size());
+      if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+        LOG.info("Not enough train/test data to perform KS analysis.");
+        continue;
+      }
+
+      String ksTrainSeries = "KSTrainSeries";
+      double[] trainTs = new double[trainTsList.size()];
+      double[] trainData = new double[trainTsList.size()];
+      for (int i = 0; i < trainTs.length; i++) {
+        trainTs[i] = trainTsList.get(i);
+        trainData[i] = trainDataList.get(i);
+      }
+
+      String ksTestSeries = "KSTestSeries";
+      double[] testTs = new double[testTsList.size()];
+      double[] testData = new double[testTsList.size()];
+      for (int i = 0; i < testTs.length; i++) {
+        testTs[i] = testTsList.get(i);
+        testData[i] = testDataList.get(i);
+      }
+
+      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+      DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData);
+      DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData);
+
+      MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData);
+      if (metricAnomaly == null) {
+        LOG.info("No anomaly from KS test.");
+      } else {
+        LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric....");
+        TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly,
+          ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        metricsCollectorInterface.emitMetrics(timelineMetrics);
+
+        trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly);
+      }
+    }
+
+    if (trendMetrics.isEmpty()) {
+      LOG.info("No Trend metrics tracked!!!!");
+    }
+
+  }
+
+  private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly,
+                                   long testStart,
+                                   long testEnd,
+                                   long trainStart,
+                                   long trainEnd) {
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(metricAnomaly.getMetricKey());
+    timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType());
+    timelineMetric.setInstanceId(null);
+    timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+    timelineMetric.setStartTime(testEnd);
+    HashMap<String, String> metadata = new HashMap<>();
+    metadata.put("method", metricAnomaly.getMethodType());
+    metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore()));
+    metadata.put("test-start-time", String.valueOf(testStart));
+    metadata.put("train-start-time", String.valueOf(trainStart));
+    metadata.put("train-end-time", String.valueOf(trainEnd));
+    timelineMetric.setMetadata(metadata);
+    TreeMap<Long,Double> metricValues = new TreeMap<>();
+    metricValues.put(testEnd, metricAnomaly.getMetricValue());
+    timelineMetric.setMetricValues(metricValues);
+    return timelineMetric;
+
+  }
+
+  public void runHsdevMethod() {
+
+    List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
+
+    for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) {
+
+      long hsdevTestEnd = ksSingleRunKey.endTime;
+      long hsdevTestStart = ksSingleRunKey.startTime;
+
+      long period = hsdevTestEnd - hsdevTestStart;
+
+      long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period;
+      long hsdevTrainEnd = hsdevTestStart;
+
+      LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " +
+        new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart)
+        + " : " + new Date(hsdevTrainEnd) + "]");
+
+      String metricName = ksSingleRunKey.metricName;
+      String appId = ksSingleRunKey.appId;
+      String hostname = ksSingleRunKey.hostname;
+      String key = metricName + "_" + appId + "_" + hostname;
+
+      TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics(
+        metricName,
+        appId,
+        hostname,
+        hsdevTrainStart,
+        hsdevTestEnd);
+
+      if (hsdevData.getMetrics().isEmpty()) {
+        LOG.info("No metrics fetched for HSDev, metricKey = " + key);
+        continue;
+      }
+
+      List<Double> trainTsList = new ArrayList<>();
+      List<Double> trainDataList = new ArrayList<>();
+      List<Double> testTsList = new ArrayList<>();
+      List<Double> testDataList = new ArrayList<>();
+
+      for (TimelineMetric timelineMetric : hsdevData.getMetrics()) {
+        for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+          if (timestamp <= hsdevTestStart) {
+            trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            trainTsList.add((double) timestamp);
+          } else {
+            testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            testTsList.add((double) timestamp);
+          }
+        }
+      }
+
+      if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+        LOG.info("Not enough train/test data to perform Hsdev analysis.");
+        continue;
+      }
+
+      String hsdevTrainSeries = "HsdevTrainSeries";
+      double[] trainTs = new double[trainTsList.size()];
+      double[] trainData = new double[trainTsList.size()];
+      for (int i = 0; i < trainTs.length; i++) {
+        trainTs[i] = trainTsList.get(i);
+        trainData[i] = trainDataList.get(i);
+      }
+
+      String hsdevTestSeries = "HsdevTestSeries";
+      double[] testTs = new double[testTsList.size()];
+      double[] testData = new double[testTsList.size()];
+      for (int i = 0; i < testTs.length; i++) {
+        testTs[i] = testTsList.get(i);
+        testData[i] = testDataList.get(i);
+      }
+
+      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+      DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData);
+      DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData);
+
+      MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData);
+      if (metricAnomaly == null) {
+        LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. ");
+        ksTechnique.updateModel(key, false, 10);
+      } else {
+        LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly.");
+        hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly,
+          hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd));
+      }
+    }
+    clearTrackedKsRunKeys();
+
+    if (!hsdevMetricAnomalies.isEmpty()) {
+      LOG.info("Publishing Hsdev Anomalies....");
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(hsdevMetricAnomalies);
+      metricsCollectorInterface.emitMetrics(timelineMetrics);
+    }
+  }
+
+  private void clearTrackedKsRunKeys() {
+    trackedKsAnomalies.clear();
+  }
+
+  private void readInputFile(String fileName) {
+    trendMetrics.clear();
+    try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
+      for (String line; (line = br.readLine()) != null; ) {
+        String[] splits = line.split(",");
+        LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
+        trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
+      }
+    } catch (IOException e) {
+      LOG.error("Error reading input file : " + e);
+    }
+  }
+
+  class KsSingleRunKey implements Serializable{
+
+    long startTime;
+    long endTime;
+    String metricName;
+    String appId;
+    String hostname;
+
+    public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+      this.metricName = metricName;
+      this.appId = appId;
+      this.hostname = hostname;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java
new file mode 100644
index 0000000..d4db227
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.core;
+
+import java.io.Serializable;
+
+public class TrendMetric implements Serializable {
+
+  String metricName;
+  String appId;
+  String hostname;
+
+  public TrendMetric(String metricName, String appId, String hostname) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.hostname = hostname;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java
new file mode 100644
index 0000000..c19adda
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.methods;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.util.List;
+
+public abstract class AnomalyDetectionTechnique {
+
+  protected String methodType;
+
+  public abstract List<MetricAnomaly> test(TimelineMetric metric);
+
+}