You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/04/01 19:14:09 UTC

[ambari] 13/39: AMBARI-21686 : Implement a test driver that provides a set of metric series with different kinds of metric behavior. (avijayan)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit e466cc4fd06a30c9fd62eb99923576664233f06e
Author: Aravindan Vijayan <av...@hortonworks.com>
AuthorDate: Thu Sep 21 15:40:35 2017 -0700

    AMBARI-21686 : Implement a test driver that provides a set of metric series with different kinds of metric behavior. (avijayan)
---
 ambari-metrics/ambari-metrics-alertservice/pom.xml |  17 +-
 .../ambari/metrics/alertservice/R/AmsRTest.java    | 147 --------
 .../metrics/alertservice/R/RFunctionInvoker.java   | 192 -----------
 .../metrics/alertservice/common/MetricAnomaly.java |  69 ----
 .../common/SingleValuedTimelineMetric.java         | 103 ------
 .../alertservice/common/TimelineMetric.java        | 238 -------------
 .../alertservice/common/TimelineMetrics.java       | 129 -------
 .../metrics/alertservice/methods/ema/EmaDS.java    |  70 ----
 .../metrics/alertservice/methods/ema/EmaModel.java | 129 -------
 .../alertservice/methods/ema/TestEmaModel.java     |  68 ----
 .../prototype/AmbariServerInterface.java           | 122 +++++++
 .../prototype/MetricAnomalyDetectorTestInput.java  | 126 +++++++
 .../prototype/MetricAnomalyTester.java             | 163 +++++++++
 .../MetricKafkaProducer.java}                      |  54 +--
 .../prototype/MetricSparkConsumer.java             | 178 ++++++++++
 .../prototype/MetricsCollectorInterface.java       | 237 +++++++++++++
 .../prototype/PointInTimeADSystem.java             | 256 ++++++++++++++
 .../alertservice/prototype/RFunctionInvoker.java   | 222 ++++++++++++
 .../prototype/TestSeriesInputRequest.java          |  88 +++++
 .../alertservice/prototype/TrendADSystem.java      | 331 ++++++++++++++++++
 .../TrendMetric.java}                              |  18 +-
 .../common/DataSeries.java}                        |  12 +-
 .../{ => prototype}/common/ResultSet.java          |   4 +-
 .../{ => prototype}/common/StatisticUtils.java     |  41 +--
 .../methods/AnomalyDetectionTechnique.java}        |  26 +-
 .../prototype/methods/MetricAnomaly.java           |  86 +++++
 .../prototype/methods/ema/EmaModel.java            | 124 +++++++
 .../methods/ema/EmaModelLoader.java                |  28 +-
 .../prototype/methods/ema/EmaTechnique.java        | 142 ++++++++
 .../prototype/methods/hsdev/HsdevTechnique.java    |  77 +++++
 .../prototype/methods/kstest/KSTechnique.java      | 101 ++++++
 .../AbstractMetricSeries.java}                     |  12 +-
 .../seriesgenerator/DualBandMetricSeries.java      |  88 +++++
 .../MetricSeriesGeneratorFactory.java              | 379 +++++++++++++++++++++
 .../seriesgenerator/MonotonicMetricSeries.java     | 101 ++++++
 .../seriesgenerator/NormalMetricSeries.java        |  81 +++++
 .../SteadyWithTurbulenceMetricSeries.java          | 115 +++++++
 .../seriesgenerator/StepFunctionMetricSeries.java  | 107 ++++++
 .../seriesgenerator/UniformMetricSeries.java       |  95 ++++++
 .../alertservice/spark/AnomalyMetricPublisher.java | 196 -----------
 .../alertservice/spark/MetricAnomalyDetector.java  | 147 --------
 .../src/main/resources/R-scripts/hsdev.r           |  12 +-
 .../src/main/resources/R-scripts/kstest.r          |   2 +-
 .../src/main/resources/R-scripts/tukeys.r          |   9 +-
 .../src/main/resources/R-scripts/util.R            |  36 --
 .../alertservice/prototype/TestEmaTechnique.java   |  86 +++++
 .../prototype/TestRFunctionInvoker.java            | 161 +++++++++
 .../metrics/alertservice/prototype/TestTukeys.java | 100 ++++++
 .../seriesgenerator/MetricSeriesGeneratorTest.java | 108 ++++++
 .../metrics2/sink/timeline/TimelineMetric.java     |   5 +-
 .../metrics2/sink/timeline/TimelineMetrics.java    |   3 +-
 .../metrics/spark/MetricAnomalyDetector.scala      |  18 +-
 .../ambari/metrics/spark/SparkPhoenixReader.scala  |  18 +-
 .../ambari-metrics-timelineservice/pom.xml         |   2 +-
 .../timeline/HBaseTimelineMetricsService.java      |  24 --
 .../metrics/timeline/MetricsPaddingMethodTest.java |   7 -
 56 files changed, 3794 insertions(+), 1716 deletions(-)

diff --git a/ambari-metrics/ambari-metrics-alertservice/pom.xml b/ambari-metrics/ambari-metrics-alertservice/pom.xml
index 4afc80f..4db8a6a 100644
--- a/ambari-metrics/ambari-metrics-alertservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-alertservice/pom.xml
@@ -31,7 +31,6 @@
     <build>
         <plugins>
             <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <configuration>
                     <source>1.8</source>
@@ -130,5 +129,21 @@
             <artifactId>spark-mllib_2.10</artifactId>
             <version>1.3.0</version>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+            <version>4.10</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ambari</groupId>
+            <artifactId>ambari-metrics-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.2.5</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java
deleted file mode 100644
index 2bbc250..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/AmsRTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.R;
-
-import org.apache.ambari.metrics.alertservice.common.ResultSet;
-import org.apache.ambari.metrics.alertservice.common.DataSet;
-import org.apache.commons.lang.ArrayUtils;
-import org.rosuda.JRI.REXP;
-import org.rosuda.JRI.RVector;
-import org.rosuda.JRI.Rengine;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-public class AmsRTest {
-
-    public static void main(String[] args) {
-
-        String metricName = "TestMetric";
-        double[] ts = getTS(1000);
-
-        double[] train_ts = ArrayUtils.subarray(ts, 0,750);
-        double[] train_x = getData(750);
-        DataSet trainData = new DataSet(metricName, train_ts, train_x);
-
-        double[] test_ts = ArrayUtils.subarray(ts, 750,1000);
-        double[] test_x = getData(250);
-        test_x[50] = 5.5; //Anomaly
-        DataSet testData = new DataSet(metricName, test_ts, test_x);
-        ResultSet rs;
-
-        Map<String, String> configs = new HashMap();
-
-        System.out.println("TUKEYS");
-        configs.put("tukeys.n", "3");
-        rs = RFunctionInvoker.tukeys(trainData, testData, configs);
-        rs.print();
-        System.out.println("--------------");
-
-        System.out.println("EMA Global");
-        configs.put("ema.n", "3");
-        configs.put("ema.w", "0.8");
-        rs = RFunctionInvoker.ema_global(trainData, testData, configs);
-        rs.print();
-        System.out.println("--------------");
-
-        System.out.println("EMA Daily");
-        rs = RFunctionInvoker.ema_daily(trainData, testData, configs);
-        rs.print();
-        System.out.println("--------------");
-
-        configs.put("ks.p_value", "0.05");
-        System.out.println("KS Test");
-        rs = RFunctionInvoker.ksTest(trainData, testData, configs);
-        rs.print();
-        System.out.println("--------------");
-
-        ts = getTS(5000);
-        train_ts = ArrayUtils.subarray(ts, 30,4800);
-        train_x = getData(4800);
-        trainData = new DataSet(metricName, train_ts, train_x);
-        test_ts = ArrayUtils.subarray(ts, 4800,5000);
-        test_x = getData(200);
-        for (int i =0; i<200;i++) {
-            test_x[i] = test_x[i]*5;
-        }
-        testData = new DataSet(metricName, test_ts, test_x);
-        configs.put("hsdev.n", "3");
-        configs.put("hsdev.nhp", "3");
-        configs.put("hsdev.interval", "86400000");
-        configs.put("hsdev.period", "604800000");
-        System.out.println("HSdev");
-        rs = RFunctionInvoker.hsdev(trainData, testData, configs);
-        rs.print();
-        System.out.println("--------------");
-
-    }
-
-    static double[] getTS(int n) {
-        long currentTime = System.currentTimeMillis();
-        double[] ts = new double[n];
-        currentTime = currentTime - (currentTime % (5*60*1000));
-
-        for (int i = 0,j=n-1; i<n; i++,j--) {
-            ts[j] = currentTime;
-            currentTime = currentTime - (5*60*1000);
-        }
-        return ts;
-    }
-
-    static void testBasic() {
-        Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
-        try {
-            r.eval("library(ambarimetricsAD)");
-            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/test.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
-            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/util.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
-            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
-            double[] ts = getTS(5000);
-            double[] x = getData(5000);
-            r.assign("ts", ts);
-            r.assign("x", x);
-            r.eval("x[1000] <- 4.5");
-            r.eval("x[2000] <- 4.75");
-            r.eval("x[3000] <- 3.5");
-            r.eval("x[4000] <- 5.5");
-            r.eval("x[5000] <- 5.0");
-            r.eval("data <- data.frame(ts,x)");
-            r.eval("names(data) <- c(\"TS\", \"Metric\")");
-            System.out.println(r.eval("data"));
-            REXP exp = r.eval("t_an <- test_methods(data)");
-            exp = r.eval("t_an");
-            String strExp = exp.asString();
-            System.out.println("result:" + exp);
-            RVector cont = (RVector) exp.getContent();
-            double[] an_ts = cont.at(0).asDoubleArray();
-            double[] an_x = cont.at(1).asDoubleArray();
-            System.out.println("result:" + strExp);
-        }
-        finally {
-            r.end();
-        }
-    }
-    static double[] getData(int n) {
-        double[] metrics = new double[n];
-        Random random = new Random();
-        for (int i = 0; i<n; i++) {
-            metrics[i] = random.nextDouble();
-        }
-        return metrics;
-    }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
deleted file mode 100644
index 2713b71..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.R;
-
-
-import org.apache.ambari.metrics.alertservice.common.ResultSet;
-import org.apache.ambari.metrics.alertservice.common.DataSet;
-import org.rosuda.JRI.REXP;
-import org.rosuda.JRI.RVector;
-import org.rosuda.JRI.Rengine;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class RFunctionInvoker {
-
-    public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
-
-
-    private static void loadDataSets(Rengine r, DataSet trainData, DataSet testData) {
-        r.assign("train_ts", trainData.ts);
-        r.assign("train_x", trainData.values);
-        r.eval("train_data <- data.frame(train_ts,train_x)");
-        r.eval("names(train_data) <- c(\"TS\", " + trainData.metricName + ")");
-
-        r.assign("test_ts", testData.ts);
-        r.assign("test_x", testData.values);
-        r.eval("test_data <- data.frame(test_ts,test_x)");
-        r.eval("names(test_data) <- c(\"TS\", " + testData.metricName + ")");
-    }
-
-
-    public static ResultSet tukeys(DataSet trainData, DataSet testData, Map<String, String> configs) {
-        try {
-            r.eval("source('tukeys.r', echo=TRUE)");
-
-            int n = Integer.parseInt(configs.get("tukeys.n"));
-            r.eval("n <- " + n);
-
-            loadDataSets(r, trainData, testData);
-
-            r.eval("an <- ams_tukeys(train_data, test_data, n)");
-            REXP exp = r.eval("an");
-            RVector cont = (RVector) exp.getContent();
-            List<double[]> result = new ArrayList();
-            for (int i = 0; i< cont.size(); i++) {
-                result.add(cont.at(i).asDoubleArray());
-            }
-            return new ResultSet(result);
-        } catch(Exception e) {
-            e.printStackTrace();
-        } finally {
-            r.end();
-        }
-        return null;
-    }
-
-    public static ResultSet ema_global(DataSet trainData, DataSet testData, Map<String, String> configs) {
-        try {
-            r.eval("source('ema.R', echo=TRUE)");
-
-            int n = Integer.parseInt(configs.get("ema.n"));
-            r.eval("n <- " + n);
-
-            double w = Double.parseDouble(configs.get("ema.w"));
-            r.eval("w <- " + w);
-
-            loadDataSets(r, trainData, testData);
-
-            r.eval("an <- ema_global(train_data, test_data, w, n)");
-            REXP exp = r.eval("an");
-            RVector cont = (RVector) exp.getContent();
-            List<double[]> result = new ArrayList();
-            for (int i = 0; i< cont.size(); i++) {
-                result.add(cont.at(i).asDoubleArray());
-            }
-            return new ResultSet(result);
-
-        } catch(Exception e) {
-            e.printStackTrace();
-        } finally {
-            r.end();
-        }
-        return null;
-    }
-
-    public static ResultSet ema_daily(DataSet trainData, DataSet testData, Map<String, String> configs) {
-        try {
-            r.eval("source('ema.R', echo=TRUE)");
-
-            int n = Integer.parseInt(configs.get("ema.n"));
-            r.eval("n <- " + n);
-
-            double w = Double.parseDouble(configs.get("ema.w"));
-            r.eval("w <- " + w);
-
-            loadDataSets(r, trainData, testData);
-
-            r.eval("an <- ema_daily(train_data, test_data, w, n)");
-            REXP exp = r.eval("an");
-            RVector cont = (RVector) exp.getContent();
-            List<double[]> result = new ArrayList();
-            for (int i = 0; i< cont.size(); i++) {
-                result.add(cont.at(i).asDoubleArray());
-            }
-            return new ResultSet(result);
-
-        } catch(Exception e) {
-            e.printStackTrace();
-        } finally {
-            r.end();
-        }
-        return null;
-    }
-
-    public static ResultSet ksTest(DataSet trainData, DataSet testData, Map<String, String> configs) {
-        try {
-            r.eval("source('kstest.r', echo=TRUE)");
-
-            double p_value = Double.parseDouble(configs.get("ks.p_value"));
-            r.eval("p_value <- " + p_value);
-
-            loadDataSets(r, trainData, testData);
-
-            r.eval("an <- ams_ks(train_data, test_data, p_value)");
-            REXP exp = r.eval("an");
-            RVector cont = (RVector) exp.getContent();
-            List<double[]> result = new ArrayList();
-            for (int i = 0; i< cont.size(); i++) {
-                result.add(cont.at(i).asDoubleArray());
-            }
-            return new ResultSet(result);
-
-        } catch(Exception e) {
-            e.printStackTrace();
-        } finally {
-            r.end();
-        }
-        return null;
-    }
-
-    public static ResultSet hsdev(DataSet trainData, DataSet testData, Map<String, String> configs) {
-        try {
-            r.eval("source('hsdev.r', echo=TRUE)");
-
-            int n = Integer.parseInt(configs.get("hsdev.n"));
-            r.eval("n <- " + n);
-
-            int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
-            r.eval("nhp <- " + nhp);
-
-            long interval = Long.parseLong(configs.get("hsdev.interval"));
-            r.eval("interval <- " + interval);
-
-            long period = Long.parseLong(configs.get("hsdev.period"));
-            r.eval("period <- " + period);
-
-            loadDataSets(r, trainData, testData);
-
-            r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
-            REXP exp = r.eval("an2");
-            RVector cont = (RVector) exp.getContent();
-
-            List<double[]> result = new ArrayList();
-            for (int i = 0; i< cont.size(); i++) {
-                result.add(cont.at(i).asDoubleArray());
-            }
-            return new ResultSet(result);
-        } catch(Exception e) {
-            e.printStackTrace();
-        } finally {
-            r.end();
-        }
-        return null;
-    }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java
deleted file mode 100644
index 4dbb425..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MetricAnomaly.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.common;
-
-public class MetricAnomaly {
-
-    private String metricKey;
-    private long timestamp;
-    private double metricValue;
-    private MethodResult methodResult;
-
-    public MetricAnomaly(String metricKey, long timestamp, double metricValue, MethodResult methodResult) {
-        this.metricKey = metricKey;
-        this.timestamp = timestamp;
-        this.metricValue = metricValue;
-        this.methodResult = methodResult;
-    }
-
-    public String getMetricKey() {
-        return metricKey;
-    }
-
-    public void setMetricName(String metricName) {
-        this.metricKey = metricName;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    public double getMetricValue() {
-        return metricValue;
-    }
-
-    public void setMetricValue(double metricValue) {
-        this.metricValue = metricValue;
-    }
-
-    public MethodResult getMethodResult() {
-        return methodResult;
-    }
-
-    public void setMethodResult(MethodResult methodResult) {
-        this.methodResult = methodResult;
-    }
-
-    public String getAnomalyAsString() {
-        return metricKey + ":" + timestamp + ":" + metricValue + ":" + methodResult.prettyPrint();
-    }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java
deleted file mode 100644
index acd4452..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/SingleValuedTimelineMetric.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.common;
-
-
-public class SingleValuedTimelineMetric {
-    private Long timestamp;
-    private Double value;
-    private String metricName;
-    private String appId;
-    private String instanceId;
-    private String hostName;
-    private Long startTime;
-    private String type;
-
-    public void setSingleTimeseriesValue(Long timestamp, Double value) {
-        this.timestamp = timestamp;
-        this.value = value;
-    }
-
-    public SingleValuedTimelineMetric(String metricName, String appId,
-                                      String instanceId, String hostName,
-                                      long timestamp, long startTime, String type) {
-        this.metricName = metricName;
-        this.appId = appId;
-        this.instanceId = instanceId;
-        this.hostName = hostName;
-        this.timestamp = timestamp;
-        this.startTime = startTime;
-        this.type = type;
-    }
-
-    public Long getTimestamp() {
-        return timestamp;
-    }
-
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public Double getValue() {
-        return value;
-    }
-
-    public String getMetricName() {
-        return metricName;
-    }
-
-    public String getAppId() {
-        return appId;
-    }
-
-    public String getInstanceId() {
-        return instanceId;
-    }
-
-    public String getHostName() {
-        return hostName;
-    }
-
-    public boolean equalsExceptTime(TimelineMetric metric) {
-        if (!metricName.equals(metric.getMetricName())) return false;
-        if (hostName != null ? !hostName.equals(metric.getHostName()) : metric.getHostName() != null)
-            return false;
-        if (appId != null ? !appId.equals(metric.getAppId()) : metric.getAppId() != null)
-            return false;
-        if (instanceId != null ? !instanceId.equals(metric.getInstanceId()) : metric.getInstanceId() != null) return false;
-
-        return true;
-    }
-
-    public TimelineMetric getTimelineMetric() {
-        TimelineMetric metric = new TimelineMetric();
-        metric.setMetricName(this.metricName);
-        metric.setAppId(this.appId);
-        metric.setHostName(this.hostName);
-        metric.setType(this.type);
-        metric.setInstanceId(this.instanceId);
-        metric.setStartTime(this.startTime);
-        metric.setTimestamp(this.timestamp);
-        metric.getMetricValues().put(timestamp, value);
-        return metric;
-    }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java
deleted file mode 100644
index 88ad834..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetric.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.common;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-@XmlRootElement(name = "metric")
-@XmlAccessorType(XmlAccessType.NONE)
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineMetric implements Comparable<TimelineMetric>, Serializable {
-
-    private String metricName;
-    private String appId;
-    private String instanceId;
-    private String hostName;
-    private long timestamp;
-    private long startTime;
-    private String type;
-    private String units;
-    private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-    private Map<String, String> metadata = new HashMap<>();
-
-    // default
-    public TimelineMetric() {
-
-    }
-
-    public TimelineMetric(String metricName, String appId, String hostName, TreeMap<Long,Double> metricValues) {
-        this.metricName = metricName;
-        this.appId = appId;
-        this.hostName = hostName;
-        this.metricValues.putAll(metricValues);
-    }
-
-    // copy constructor
-    public TimelineMetric(TimelineMetric metric) {
-        setMetricName(metric.getMetricName());
-        setType(metric.getType());
-        setUnits(metric.getUnits());
-        setTimestamp(metric.getTimestamp());
-        setAppId(metric.getAppId());
-        setInstanceId(metric.getInstanceId());
-        setHostName(metric.getHostName());
-        setStartTime(metric.getStartTime());
-        setMetricValues(new TreeMap<Long, Double>(metric.getMetricValues()));
-    }
-
-    @XmlElement(name = "metricname")
-    public String getMetricName() {
-        return metricName;
-    }
-
-    public void setMetricName(String metricName) {
-        this.metricName = metricName;
-    }
-
-    @XmlElement(name = "appid")
-    public String getAppId() {
-        return appId;
-    }
-
-    public void setAppId(String appId) {
-        this.appId = appId;
-    }
-
-    @XmlElement(name = "instanceid")
-    public String getInstanceId() {
-        return instanceId;
-    }
-
-    public void setInstanceId(String instanceId) {
-        this.instanceId = instanceId;
-    }
-
-    @XmlElement(name = "hostname")
-    public String getHostName() {
-        return hostName;
-    }
-
-    public void setHostName(String hostName) {
-        this.hostName = hostName;
-    }
-
-    @XmlElement(name = "timestamp")
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    @XmlElement(name = "starttime")
-    public long getStartTime() {
-        return startTime;
-    }
-
-    public void setStartTime(long startTime) {
-        this.startTime = startTime;
-    }
-
-    @XmlElement(name = "type", defaultValue = "UNDEFINED")
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    @XmlElement(name = "units")
-    public String getUnits() {
-        return units;
-    }
-
-    public void setUnits(String units) {
-        this.units = units;
-    }
-
-    @XmlElement(name = "metrics")
-    public TreeMap<Long, Double> getMetricValues() {
-        return metricValues;
-    }
-
-    public void setMetricValues(TreeMap<Long, Double> metricValues) {
-        this.metricValues = metricValues;
-    }
-
-    public void addMetricValues(Map<Long, Double> metricValues) {
-        this.metricValues.putAll(metricValues);
-    }
-
-    @XmlElement(name = "metadata")
-    public Map<String,String> getMetadata () {
-        return metadata;
-    }
-
-    public void setMetadata (Map<String,String> metadata) {
-        this.metadata = metadata;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        TimelineMetric metric = (TimelineMetric) o;
-
-        if (!metricName.equals(metric.metricName)) return false;
-        if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
-            return false;
-        if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
-            return false;
-        if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
-            return false;
-        if (timestamp != metric.timestamp) return false;
-        if (startTime != metric.startTime) return false;
-
-        return true;
-    }
-
-    public boolean equalsExceptTime(TimelineMetric metric) {
-        if (!metricName.equals(metric.metricName)) return false;
-        if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
-            return false;
-        if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
-            return false;
-        if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
-            return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = metricName.hashCode();
-        result = 31 * result + (appId != null ? appId.hashCode() : 0);
-        result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
-        result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
-        result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-        return result;
-    }
-
-    @Override
-    public int compareTo(TimelineMetric other) {
-        if (timestamp > other.timestamp) {
-            return -1;
-        } else if (timestamp < other.timestamp) {
-            return 1;
-        } else {
-            return metricName.compareTo(other.metricName);
-        }
-    }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java
deleted file mode 100644
index 7df6a9c..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/TimelineMetrics.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.common;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * The class that hosts a list of timeline entities.
- */
-@XmlRootElement(name = "metrics")
-@XmlAccessorType(XmlAccessType.NONE)
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class TimelineMetrics implements Serializable {
-
-    private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
-
-    public TimelineMetrics() {}
-
-    @XmlElement(name = "metrics")
-    public List<TimelineMetric> getMetrics() {
-        return allMetrics;
-    }
-
-    public void setMetrics(List<TimelineMetric> allMetrics) {
-        this.allMetrics = allMetrics;
-    }
-
-    private boolean isEqualTimelineMetrics(TimelineMetric metric1,
-                                           TimelineMetric metric2) {
-
-        boolean isEqual = true;
-
-        if (!metric1.getMetricName().equals(metric2.getMetricName())) {
-            return false;
-        }
-
-        if (metric1.getHostName() != null) {
-            isEqual = metric1.getHostName().equals(metric2.getHostName());
-        }
-
-        if (metric1.getAppId() != null) {
-            isEqual = metric1.getAppId().equals(metric2.getAppId());
-        }
-
-        return isEqual;
-    }
-
-    /**
-     * Merge with existing TimelineMetric if everything except startTime is
-     * the same.
-     * @param metric {@link TimelineMetric}
-     */
-    public void addOrMergeTimelineMetric(TimelineMetric metric) {
-        TimelineMetric metricToMerge = null;
-
-        if (!allMetrics.isEmpty()) {
-            for (TimelineMetric timelineMetric : allMetrics) {
-                if (timelineMetric.equalsExceptTime(metric)) {
-                    metricToMerge = timelineMetric;
-                    break;
-                }
-            }
-        }
-
-        if (metricToMerge != null) {
-            metricToMerge.addMetricValues(metric.getMetricValues());
-            if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
-                metricToMerge.setTimestamp(metric.getTimestamp());
-            }
-            if (metricToMerge.getStartTime() > metric.getStartTime()) {
-                metricToMerge.setStartTime(metric.getStartTime());
-            }
-        } else {
-            allMetrics.add(metric);
-        }
-    }
-
-    // Optimization that addresses too many TreeMaps from getting created.
-    public void addOrMergeTimelineMetric(SingleValuedTimelineMetric metric) {
-        TimelineMetric metricToMerge = null;
-
-        if (!allMetrics.isEmpty()) {
-            for (TimelineMetric timelineMetric : allMetrics) {
-                if (metric.equalsExceptTime(timelineMetric)) {
-                    metricToMerge = timelineMetric;
-                    break;
-                }
-            }
-        }
-
-        if (metricToMerge != null) {
-            metricToMerge.getMetricValues().put(metric.getTimestamp(), metric.getValue());
-            if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
-                metricToMerge.setTimestamp(metric.getTimestamp());
-            }
-            if (metricToMerge.getStartTime() > metric.getStartTime()) {
-                metricToMerge.setStartTime(metric.getStartTime());
-            }
-        } else {
-            allMetrics.add(metric.getTimelineMetric());
-        }
-    }
-}
-
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java
deleted file mode 100644
index 32cd96b..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaDS.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.methods.ema;
-
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-
-@XmlRootElement
-public class EmaDS implements Serializable {
-
-    String metricName;
-    String appId;
-    String hostname;
-    double ema;
-    double ems;
-    double weight;
-    int timessdev;
-    private static final Log LOG = LogFactory.getLog(EmaDS.class);
-
-    public EmaDS(String metricName, String appId, String hostname, double weight, int timessdev) {
-        this.metricName = metricName;
-        this.appId = appId;
-        this.hostname = hostname;
-        this.weight = weight;
-        this.timessdev = timessdev;
-        this.ema = 0.0;
-        this.ems = 0.0;
-    }
-
-
-    public EmaResult testAndUpdate(double metricValue) {
-
-        double diff  = Math.abs(ema - metricValue) - (timessdev * ems);
-
-        ema = weight * ema + (1 - weight) * metricValue;
-        ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
-        LOG.info(ema + ", " + ems);
-        return diff > 0 ? new EmaResult(diff) : null;
-    }
-
-    public void update(double metricValue) {
-        ema = weight * ema + (1 - weight) * metricValue;
-        ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
-        LOG.info(ema + ", " + ems);
-    }
-
-    public EmaResult test(double metricValue) {
-        double diff  = Math.abs(ema - metricValue) - (timessdev * ems);
-        return diff > 0 ? new EmaResult(diff) : null;
-    }
-
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java
deleted file mode 100644
index 13a0f55..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModel.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.methods.ema;
-
-import com.google.gson.Gson;
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
-import org.apache.ambari.metrics.alertservice.common.MethodResult;
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel;
-import org.apache.spark.SparkContext;
-import org.apache.spark.mllib.util.Saveable;
-
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.*;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@XmlRootElement
-public class EmaModel implements MetricAnomalyModel, Saveable, Serializable {
-
-    @XmlElement(name = "trackedEmas")
-    private Map<String, EmaDS> trackedEmas = new HashMap<>();
-    private static final Log LOG = LogFactory.getLog(EmaModel.class);
-
-    public List<MetricAnomaly> onNewMetric(TimelineMetric metric) {
-
-        String metricName = metric.getMetricName();
-        String appId = metric.getAppId();
-        String hostname = metric.getHostName();
-        String key = metricName + "_" + appId + "_" + hostname;
-        List<MetricAnomaly> anomalies = new ArrayList<>();
-
-        if (!trackedEmas.containsKey(metricName)) {
-            trackedEmas.put(key, new EmaDS(metricName, appId, hostname, 0.8, 3));
-        }
-
-        EmaDS emaDS = trackedEmas.get(key);
-        for (Long timestamp : metric.getMetricValues().keySet()) {
-            double metricValue = metric.getMetricValues().get(timestamp);
-            MethodResult result = emaDS.testAndUpdate(metricValue);
-            if (result != null) {
-                MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result);
-                anomalies.add(metricAnomaly);
-            }
-        }
-        return anomalies;
-    }
-
-    public EmaDS train(TimelineMetric metric, double weight, int timessdev) {
-
-        String metricName = metric.getMetricName();
-        String appId = metric.getAppId();
-        String hostname = metric.getHostName();
-        String key = metricName + "_" + appId + "_" + hostname;
-
-        EmaDS emaDS = new EmaDS(metric.getMetricName(), metric.getAppId(), metric.getHostName(), weight, timessdev);
-        LOG.info("In EMA Train step");
-        for (Long timestamp : metric.getMetricValues().keySet()) {
-            emaDS.update(metric.getMetricValues().get(timestamp));
-        }
-        trackedEmas.put(key, emaDS);
-        return emaDS;
-    }
-
-    public List<MetricAnomaly> test(TimelineMetric metric) {
-        String metricName = metric.getMetricName();
-        String appId = metric.getAppId();
-        String hostname = metric.getHostName();
-        String key = metricName + "_" + appId + "_" + hostname;
-
-        EmaDS emaDS = trackedEmas.get(key);
-
-        if (emaDS == null) {
-            return new ArrayList<>();
-        }
-
-        List<MetricAnomaly> anomalies = new ArrayList<>();
-
-        for (Long timestamp : metric.getMetricValues().keySet()) {
-            double metricValue = metric.getMetricValues().get(timestamp);
-            MethodResult result = emaDS.testAndUpdate(metricValue);
-            if (result != null) {
-                MetricAnomaly metricAnomaly = new MetricAnomaly(key,timestamp, metricValue, result);
-                anomalies.add(metricAnomaly);
-            }
-        }
-        return anomalies;
-    }
-
-    @Override
-    public void save(SparkContext sc, String path) {
-        Gson gson = new Gson();
-        try {
-            String json = gson.toJson(this);
-            try (Writer writer = new BufferedWriter(new OutputStreamWriter(
-                    new FileOutputStream(path), "utf-8"))) {
-                writer.write(json);
-            }        } catch (IOException e) {
-            LOG.error(e);
-        }
-    }
-
-    @Override
-    public String formatVersion() {
-        return "1.0";
-    }
-
-}
-
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java
deleted file mode 100644
index b851dab..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/TestEmaModel.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.methods.ema;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.gson.Gson;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class TestEmaModel {
-
-    public static void main(String[] args) throws IOException {
-
-        long now = System.currentTimeMillis();
-        TimelineMetric metric1 = new TimelineMetric();
-        metric1.setMetricName("dummy_metric");
-        metric1.setHostName("dummy_host");
-        metric1.setTimestamp(now);
-        metric1.setStartTime(now - 1000);
-        metric1.setAppId("HOST");
-        metric1.setType("Integer");
-
-        TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-
-        for (int i = 0; i<20;i++) {
-            double metric = 9 + Math.random();
-            metricValues.put(now - i*100, metric);
-        }
-        metric1.setMetricValues(metricValues);
-
-        EmaModel emaModel = new EmaModel();
-
-        emaModel.train(metric1, 0.8, 3);
-    }
-
-    /*
-     {{
-            put(now - 100, 1.20);
-            put(now - 200, 1.25);
-            put(now - 300, 1.30);
-            put(now - 400, 4.50);
-            put(now - 500, 1.35);
-            put(now - 400, 5.50);
-        }}
-     */
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
new file mode 100644
index 0000000..0c1c6fc
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+public class AmbariServerInterface implements Serializable{
+
+  private static final Log LOG = LogFactory.getLog(AmbariServerInterface.class);
+
+  private String ambariServerHost;
+  private String clusterName;
+
+  public AmbariServerInterface(String ambariServerHost, String clusterName) {
+    this.ambariServerHost = ambariServerHost;
+    this.clusterName = clusterName;
+  }
+
+  public int getPointInTimeSensitivity() {
+
+    String url = constructUri("http", ambariServerHost, "8080", "/api/v1/clusters/" + clusterName + "/alert_definitions?fields=*");
+
+    URL obj = null;
+    BufferedReader in = null;
+
+    try {
+      obj = new URL(url);
+      HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+      con.setRequestMethod("GET");
+
+      String encoded = Base64.getEncoder().encodeToString(("admin:admin").getBytes(StandardCharsets.UTF_8));
+      con.setRequestProperty("Authorization", "Basic "+encoded);
+
+      int responseCode = con.getResponseCode();
+      LOG.info("Sending 'GET' request to URL : " + url);
+      LOG.info("Response Code : " + responseCode);
+
+      in = new BufferedReader(
+        new InputStreamReader(con.getInputStream()));
+
+      StringBuilder responseJsonSb = new StringBuilder();
+      String line;
+      while ((line = in.readLine()) != null) {
+        responseJsonSb.append(line);
+      }
+
+      JSONObject jsonObject = new JSONObject(responseJsonSb.toString());
+      JSONArray array = jsonObject.getJSONArray("items");
+      for(int i = 0 ; i < array.length() ; i++){
+        JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition");
+        LOG.info("alertDefn : " + alertDefn.get("name"));
+        if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) {
+          JSONObject sourceNode = alertDefn.getJSONObject("source");
+          JSONArray params = sourceNode.getJSONArray("parameters");
+          for(int j = 0 ; j < params.length() ; j++){
+            JSONObject param = params.getJSONObject(j);
+            if (param.get("name").equals("sensitivity")) {
+              return param.getInt("value");
+            }
+          }
+          break;
+        }
+      }
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      if (in != null) {
+        try {
+          in.close();
+        } catch (IOException e) {
+          LOG.warn(e);
+        }
+      }
+    }
+
+    return -1;
+  }
+
+  private String constructUri(String protocol, String host, String port, String path) {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append(path);
+    return sb.toString();
+  }
+
+//  public static void main(String[] args) {
+//    AmbariServerInterface ambariServerInterface = new AmbariServerInterface();
+//    ambariServerInterface.getPointInTimeSensitivity("avijayan-ams-1.openstacklocal","c1");
+//  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java
new file mode 100644
index 0000000..490328a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyDetectorTestInput.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class MetricAnomalyDetectorTestInput {
+
+  public MetricAnomalyDetectorTestInput() {
+  }
+
+  //Train data
+  private String trainDataName;
+  private String trainDataType;
+  private Map<String, String> trainDataConfigs;
+  private int trainDataSize;
+
+  //Test data
+  private String testDataName;
+  private String testDataType;
+  private Map<String, String> testDataConfigs;
+  private int testDataSize;
+
+  //Algorithm data
+  private List<String> methods;
+  private Map<String, String> methodConfigs;
+
+  public String getTrainDataName() {
+    return trainDataName;
+  }
+
+  public void setTrainDataName(String trainDataName) {
+    this.trainDataName = trainDataName;
+  }
+
+  public String getTrainDataType() {
+    return trainDataType;
+  }
+
+  public void setTrainDataType(String trainDataType) {
+    this.trainDataType = trainDataType;
+  }
+
+  public Map<String, String> getTrainDataConfigs() {
+    return trainDataConfigs;
+  }
+
+  public void setTrainDataConfigs(Map<String, String> trainDataConfigs) {
+    this.trainDataConfigs = trainDataConfigs;
+  }
+
+  public String getTestDataName() {
+    return testDataName;
+  }
+
+  public void setTestDataName(String testDataName) {
+    this.testDataName = testDataName;
+  }
+
+  public String getTestDataType() {
+    return testDataType;
+  }
+
+  public void setTestDataType(String testDataType) {
+    this.testDataType = testDataType;
+  }
+
+  public Map<String, String> getTestDataConfigs() {
+    return testDataConfigs;
+  }
+
+  public void setTestDataConfigs(Map<String, String> testDataConfigs) {
+    this.testDataConfigs = testDataConfigs;
+  }
+
+  public Map<String, String> getMethodConfigs() {
+    return methodConfigs;
+  }
+
+  public void setMethodConfigs(Map<String, String> methodConfigs) {
+    this.methodConfigs = methodConfigs;
+  }
+
+  public int getTrainDataSize() {
+    return trainDataSize;
+  }
+
+  public void setTrainDataSize(int trainDataSize) {
+    this.trainDataSize = trainDataSize;
+  }
+
+  public int getTestDataSize() {
+    return testDataSize;
+  }
+
+  public void setTestDataSize(int testDataSize) {
+    this.testDataSize = testDataSize;
+  }
+
+  public List<String> getMethods() {
+    return methods;
+  }
+
+  public void setMethods(List<String> methods) {
+    this.methods = methods;
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java
new file mode 100644
index 0000000..bff8120
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricAnomalyTester.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class MetricAnomalyTester {
+
+  public static String appId = MetricsCollectorInterface.serviceName;
+  static final Log LOG = LogFactory.getLog(MetricAnomalyTester.class);
+  static Map<String, TimelineMetric> timelineMetricMap = new HashMap<>();
+
+  public static TimelineMetrics runTestAnomalyRequest(MetricAnomalyDetectorTestInput input) throws UnknownHostException {
+
+    long currentTime = System.currentTimeMillis();
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    String hostname = InetAddress.getLocalHost().getHostName();
+
+    //Train data
+    TimelineMetric metric1 = new TimelineMetric();
+    if (StringUtils.isNotEmpty(input.getTrainDataName())) {
+      metric1 = timelineMetricMap.get(input.getTrainDataName());
+      if (metric1 == null) {
+        metric1 = new TimelineMetric();
+        double[] trainSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTrainDataType(), input.getTrainDataSize(), input.getTrainDataConfigs());
+        metric1.setMetricName(input.getTrainDataName());
+        metric1.setAppId(appId);
+        metric1.setHostName(hostname);
+        metric1.setStartTime(currentTime);
+        metric1.setInstanceId(null);
+        metric1.setMetricValues(getAsTimeSeries(currentTime, trainSeries));
+        timelineMetricMap.put(input.getTrainDataName(), metric1);
+      }
+      timelineMetrics.getMetrics().add(metric1);
+    } else {
+      LOG.error("No train data name specified");
+    }
+
+    //Test data
+    TimelineMetric metric2 = new TimelineMetric();
+    if (StringUtils.isNotEmpty(input.getTestDataName())) {
+      metric2 = timelineMetricMap.get(input.getTestDataName());
+      if (metric2 == null) {
+        metric2 = new TimelineMetric();
+        double[] testSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTestDataType(), input.getTestDataSize(), input.getTestDataConfigs());
+        metric2.setMetricName(input.getTestDataName());
+        metric2.setAppId(appId);
+        metric2.setHostName(hostname);
+        metric2.setStartTime(currentTime);
+        metric2.setInstanceId(null);
+        metric2.setMetricValues(getAsTimeSeries(currentTime, testSeries));
+        timelineMetricMap.put(input.getTestDataName(), metric2);
+      }
+      timelineMetrics.getMetrics().add(metric2);
+    } else {
+      LOG.warn("No test data name specified");
+    }
+
+    //Invoke method
+    if (CollectionUtils.isNotEmpty(input.getMethods())) {
+      RFunctionInvoker.setScriptsDir("/etc/ambari-metrics-collector/conf/R-scripts");
+      for (String methodType : input.getMethods()) {
+        ResultSet result = RFunctionInvoker.executeMethod(methodType, getAsDataSeries(metric1), getAsDataSeries(metric2), input.getMethodConfigs());
+        TimelineMetric timelineMetric = getAsTimelineMetric(result, methodType, input, currentTime, hostname);
+        if (timelineMetric != null) {
+          timelineMetrics.getMetrics().add(timelineMetric);
+        }
+      }
+    } else {
+      LOG.warn("No anomaly method requested");
+    }
+
+    return timelineMetrics;
+  }
+
+
+  private static TimelineMetric getAsTimelineMetric(ResultSet result, String methodType, MetricAnomalyDetectorTestInput input, long currentTime, String hostname) {
+
+    if (result == null) {
+      return null;
+    }
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+    if (methodType.equals("tukeys") || methodType.equals("ema")) {
+      timelineMetric.setMetricName(input.getTrainDataName() + "_" + input.getTestDataName() + "_" + methodType + "_" + currentTime);
+      timelineMetric.setHostName(hostname);
+      timelineMetric.setAppId(appId);
+      timelineMetric.setInstanceId(null);
+      timelineMetric.setStartTime(currentTime);
+
+      TreeMap<Long, Double> metricValues = new TreeMap<>();
+      if (result.resultset.size() > 0) {
+        double[] ts = result.resultset.get(0);
+        double[] metrics = result.resultset.get(1);
+        for (int i = 0; i < ts.length; i++) {
+          if (i == 0) {
+            timelineMetric.setStartTime((long) ts[i]);
+          }
+          metricValues.put((long) ts[i], metrics[i]);
+        }
+      }
+      timelineMetric.setMetricValues(metricValues);
+      return timelineMetric;
+    }
+    return null;
+  }
+
+
+  private static TreeMap<Long, Double> getAsTimeSeries(long currentTime, double[] values) {
+
+    long startTime = currentTime - (values.length - 1) * 60 * 1000;
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+
+    for (int i = 0; i < values.length; i++) {
+      metricValues.put(startTime, values[i]);
+      startTime += (60 * 1000);
+    }
+    return metricValues;
+  }
+
+  private static DataSeries getAsDataSeries(TimelineMetric timelineMetric) {
+
+    TreeMap<Long, Double> metricValues = timelineMetric.getMetricValues();
+    double[] timestamps = new double[metricValues.size()];
+    double[] values = new double[metricValues.size()];
+    int i = 0;
+
+    for (Long timestamp : metricValues.keySet()) {
+      timestamps[i] = timestamp;
+      values[i++] = metricValues.get(timestamp);
+    }
+    return new DataSeries(timelineMetric.getMetricName() + "_" + timelineMetric.getAppId() + "_" + timelineMetric.getHostName(), timestamps, values);
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java
similarity index 56%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java
index daaee5c..8023d15 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricKafkaProducer.java
@@ -15,25 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.metrics.alertservice.spark;
+package org.apache.ambari.metrics.alertservice.prototype;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
-import org.apache.kafka.clients.producer.*;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.Properties;
-import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-public class AmsKafkaProducer {
+public class MetricKafkaProducer {
 
     Producer producer;
     private static String topicName = "ambari-metrics-topic";
 
-    public AmsKafkaProducer(String kafkaServers) {
+    public MetricKafkaProducer(String kafkaServers) {
         Properties configProperties = new Properties();
         configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667"
         configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
@@ -51,42 +53,4 @@ public class AmsKafkaProducer {
         System.out.println(kafkaFuture.isDone());
         System.out.println(kafkaFuture.get().topic());
     }
-
-    public static void main(String[] args) throws ExecutionException, InterruptedException {
-        final long now = System.currentTimeMillis();
-
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        TimelineMetric metric1 = new TimelineMetric();
-        metric1.setMetricName("mem_free");
-        metric1.setHostName("avijayan-ams-3.openstacklocal");
-        metric1.setTimestamp(now);
-        metric1.setStartTime(now - 1000);
-        metric1.setAppId("HOST");
-        metric1.setType("Integer");
-
-        TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-
-        for (int i = 0; i<20;i++) {
-            double metric = 20000 + Math.random();
-            metricValues.put(now - i*100, metric);
-        }
-
-        metric1.setMetricValues(metricValues);
-
-//        metric1.setMetricValues(new TreeMap<Long, Double>() {{
-//            put(now - 100, 1.20);
-//            put(now - 200, 11.25);
-//            put(now - 300, 1.30);
-//            put(now - 400, 4.50);
-//            put(now - 500, 16.35);
-//            put(now - 400, 5.50);
-//        }});
-
-        timelineMetrics.getMetrics().add(metric1);
-
-        for (int i = 0; i<1; i++) {
-            new AmsKafkaProducer("avijayan-ams-2.openstacklocal:6667").sendMetrics(timelineMetrics);
-            Thread.sleep(1000);
-        }
-    }
 }
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
new file mode 100644
index 0000000..7735d6c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import scala.Tuple2;
+
+import java.util.*;
+
+public class MetricSparkConsumer {
+
+  private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class);
+  private static String groupId = "ambari-metrics-group";
+  private static String topicName = "ambari-metrics-topic";
+  private static int numThreads = 1;
+  private static long pitStartTime = System.currentTimeMillis();
+  private static long ksStartTime = pitStartTime;
+  private static long hdevStartTime = ksStartTime;
+
+  public MetricSparkConsumer() {
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+
+    if (args.length < 5) {
+      System.err.println("Usage: MetricSparkConsumer <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
+      System.exit(1);
+    }
+
+    List<String> appIds = Arrays.asList(args[0].split(","));
+    String collectorHost = args[1];
+    String collectorPort = args[2];
+    String collectorProtocol = args[3];
+    String zkQuorum = args[4];
+
+    double emaW = StringUtils.isNotEmpty(args[5]) ? Double.parseDouble(args[5]) : 0.5;
+    double emaN = StringUtils.isNotEmpty(args[8]) ? Double.parseDouble(args[6]) : 3;
+    double tukeysN = StringUtils.isNotEmpty(args[7]) ? Double.parseDouble(args[7]) : 3;
+
+    long pitTestInterval = StringUtils.isNotEmpty(args[8]) ? Long.parseLong(args[8]) : 5 * 60 * 1000;
+    long pitTrainInterval = StringUtils.isNotEmpty(args[9]) ? Long.parseLong(args[9]) : 15 * 60 * 1000;
+
+    String fileName = args[10];
+    long ksTestInterval = StringUtils.isNotEmpty(args[11]) ? Long.parseLong(args[11]) : 10 * 60 * 1000;
+    long ksTrainInterval = StringUtils.isNotEmpty(args[12]) ? Long.parseLong(args[12]) : 10 * 60 * 1000;
+    int hsdevNhp = StringUtils.isNotEmpty(args[13]) ? Integer.parseInt(args[13]) : 3;
+    long hsdevInterval = StringUtils.isNotEmpty(args[14]) ? Long.parseLong(args[14]) : 30 * 60 * 1000;
+
+    String ambariServerHost = args[15];
+    String clusterName = args[16];
+
+    MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
+
+    SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector");
+
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
+
+    EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN);
+    PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface,
+      tukeysN,
+      pitTestInterval,
+      pitTrainInterval,
+      ambariServerHost,
+      clusterName);
+
+    TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
+      ksTestInterval,
+      ksTrainInterval,
+      hsdevNhp,
+      fileName);
+
+    Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique);
+    Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem);
+    Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem);
+    Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface);
+
+    JavaPairReceiverInputDStream<String, String> messages =
+      KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
+
+    //Convert JSON string to TimelineMetrics.
+    JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
+      @Override
+      public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
+        return metrics;
+      }
+    });
+
+    timelineMetricsStream.print();
+
+    //Group TimelineMetric by AppId.
+    JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
+      timelineMetrics -> timelineMetrics.getMetrics().isEmpty()  ?  new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics)
+    );
+
+    appMetricStream.print();
+
+    //Filter AppIds that are not needed.
+    JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
+      @Override
+      public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
+        return appIds.contains(appMetricTuple._1);
+      }
+    });
+
+    filteredAppMetricStream.print();
+
+    filteredAppMetricStream.foreachRDD(rdd -> {
+      rdd.foreach(
+        tuple2 -> {
+          long currentTime = System.currentTimeMillis();
+          EmaTechnique ema = emaTechniqueBroadcast.getValue();
+          if (currentTime > pitStartTime + pitTestInterval) {
+            LOG.info("Running Tukeys....");
+            pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime);
+            pitStartTime = pitStartTime + pitTestInterval;
+          }
+
+          if (currentTime > ksStartTime + ksTestInterval) {
+            LOG.info("Running KS Test....");
+            trendADSystemBroadcast.getValue().runKSTest(currentTime);
+            ksStartTime = ksStartTime + ksTestInterval;
+          }
+
+          if (currentTime > hdevStartTime + hsdevInterval) {
+            LOG.info("Running HSdev Test....");
+            trendADSystemBroadcast.getValue().runHsdevMethod();
+            hdevStartTime = hdevStartTime + hsdevInterval;
+          }
+
+          TimelineMetrics metrics = tuple2._2();
+          for (TimelineMetric timelineMetric : metrics.getMetrics()) {
+            List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+            metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+          }
+        });
+    });
+
+    jssc.start();
+    jssc.awaitTermination();
+  }
+}
+
+
+
+
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
new file mode 100644
index 0000000..7b3f63d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+public class MetricsCollectorInterface implements Serializable {
+
+  private static String hostName = null;
+  private String instanceId = null;
+  public final static String serviceName = "anomaly-engine";
+  private String collectorHost;
+  private String protocol;
+  private String port;
+  private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
+  private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class);
+  private static ObjectMapper mapper;
+  private final static ObjectReader timelineObjectReader;
+
+  static {
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.getSerializationConfig()
+      .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+    timelineObjectReader = mapper.reader(TimelineMetrics.class);
+  }
+
+  public MetricsCollectorInterface(String collectorHost, String protocol, String port) {
+    this.collectorHost = collectorHost;
+    this.protocol = protocol;
+    this.port = port;
+    this.hostName = getDefaultLocalHostName();
+  }
+
+  public static String getDefaultLocalHostName() {
+
+    if (hostName != null) {
+      return hostName;
+    }
+
+    try {
+      return InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException e) {
+      LOG.info("Error getting host address");
+    }
+    return null;
+  }
+
+  public void publish(List<MetricAnomaly> metricAnomalies) {
+    if (CollectionUtils.isNotEmpty(metricAnomalies)) {
+      LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
+      List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
+      if (!metricList.isEmpty()) {
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.setMetrics(metricList);
+        emitMetrics(timelineMetrics);
+      }
+    } else {
+      LOG.info("No anomalies to send.");
+    }
+  }
+
+  private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
+    List<TimelineMetric> metrics = new ArrayList<>();
+
+    if (metricAnomalies.isEmpty()) {
+      return metrics;
+    }
+
+    for (MetricAnomaly anomaly : metricAnomalies) {
+      TimelineMetric timelineMetric = new TimelineMetric();
+      timelineMetric.setMetricName(anomaly.getMetricKey());
+      timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType());
+      timelineMetric.setInstanceId(null);
+      timelineMetric.setHostName(getDefaultLocalHostName());
+      timelineMetric.setStartTime(anomaly.getTimestamp());
+      HashMap<String, String> metadata = new HashMap<>();
+      metadata.put("method", anomaly.getMethodType());
+      metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore()));
+      timelineMetric.setMetadata(metadata);
+      TreeMap<Long,Double> metricValues = new TreeMap<>();
+      metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue());
+      timelineMetric.setMetricValues(metricValues);
+
+      metrics.add(timelineMetric);
+    }
+    return metrics;
+  }
+
+  public boolean emitMetrics(TimelineMetrics metrics) {
+    String connectUrl = constructTimelineMetricUri();
+    String jsonData = null;
+    LOG.info("EmitMetrics connectUrl = " + connectUrl);
+    try {
+      jsonData = mapper.writeValueAsString(metrics);
+      LOG.info(jsonData);
+    } catch (IOException e) {
+      LOG.error("Unable to parse metrics", e);
+    }
+    if (jsonData != null) {
+      return emitMetricsJson(connectUrl, jsonData);
+    }
+    return false;
+  }
+
+  private HttpURLConnection getConnection(String spec) throws IOException {
+    return (HttpURLConnection) new URL(spec).openConnection();
+  }
+
+  private boolean emitMetricsJson(String connectUrl, String jsonData) {
+    int timeout = 10000;
+    HttpURLConnection connection = null;
+    try {
+      if (connectUrl == null) {
+        throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+      }
+      connection = getConnection(connectUrl);
+
+      connection.setRequestMethod("POST");
+      connection.setRequestProperty("Content-Type", "application/json");
+      connection.setRequestProperty("Connection", "Keep-Alive");
+      connection.setConnectTimeout(timeout);
+      connection.setReadTimeout(timeout);
+      connection.setDoOutput(true);
+
+      if (jsonData != null) {
+        try (OutputStream os = connection.getOutputStream()) {
+          os.write(jsonData.getBytes("UTF-8"));
+        }
+      }
+
+      int statusCode = connection.getResponseCode();
+
+      if (statusCode != 200) {
+        LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
+          "statusCode = " + statusCode);
+      } else {
+        LOG.info("Metrics posted to Collector " + connectUrl);
+      }
+      return true;
+    } catch (IOException ioe) {
+      LOG.error(ioe.getMessage());
+    }
+    return false;
+  }
+
+  private String constructTimelineMetricUri() {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(collectorHost);
+    sb.append(":");
+    sb.append(port);
+    sb.append(WS_V1_TIMELINE_METRICS);
+    return sb.toString();
+  }
+
+  public TimelineMetrics fetchMetrics(String metricName,
+                                      String appId,
+                                      String hostname,
+                                      long startime,
+                                      long endtime) {
+
+    String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
+      "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
+    LOG.info("Fetch metrics URL : " + url);
+
+    URL obj = null;
+    BufferedReader in = null;
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+    try {
+      obj = new URL(url);
+      HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+      con.setRequestMethod("GET");
+      int responseCode = con.getResponseCode();
+      LOG.info("Sending 'GET' request to URL : " + url);
+      LOG.info("Response Code : " + responseCode);
+
+      in = new BufferedReader(
+        new InputStreamReader(con.getInputStream()));
+      timelineMetrics = timelineObjectReader.readValue(in);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      if (in != null) {
+        try {
+          in.close();
+        } catch (IOException e) {
+          LOG.warn(e);
+        }
+      }
+    }
+
+    LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics.");
+    return timelineMetrics;
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
new file mode 100644
index 0000000..b4a8593
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaModel;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class PointInTimeADSystem implements Serializable {
+
+  //private EmaTechnique emaTechnique;
+  private MetricsCollectorInterface metricsCollectorInterface;
+  private Map<String, Double> tukeysNMap;
+  private double defaultTukeysN = 3;
+
+  private long testIntervalMillis = 5*60*1000; //10mins
+  private long trainIntervalMillis = 15*60*1000; //1hour
+
+  private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class);
+
+  private AmbariServerInterface ambariServerInterface;
+  private int sensitivity = 50;
+  private int minSensitivity = 0;
+  private int maxSensitivity = 10;
+
+  public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
+                             long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
+    this.metricsCollectorInterface = metricsCollectorInterface;
+    this.defaultTukeysN = defaultTukeysN;
+    this.tukeysNMap = new HashMap<>();
+    this.testIntervalMillis = testIntervalMillis;
+    this.trainIntervalMillis = trainIntervalMillis;
+    this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName);
+    LOG.info("Starting PointInTimeADSystem...");
+  }
+
+  public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) {
+    LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime  - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]");
+
+    int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity();
+    if (requiredSensivity == -1 || requiredSensivity == sensitivity) {
+      LOG.info("No change in sensitivity needed.");
+    } else {
+      LOG.info("Current tukey's N value = " + defaultTukeysN);
+      if (requiredSensivity > sensitivity) {
+        int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
+        while (sensitivity < targetSensitivity) {
+          defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.1;
+          sensitivity++;
+        }
+      } else {
+        int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
+        while (sensitivity > targetSensitivity) {
+          defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.1;
+          sensitivity--;
+        }
+      }
+      LOG.info("New tukey's N value = " + defaultTukeysN);
+    }
+
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    for (String metricKey : emaTechnique.getTrackedEmas().keySet()) {
+      LOG.info("EMA key = " + metricKey);
+      EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey);
+      String metricName = emaModel.getMetricName();
+      String appId = emaModel.getAppId();
+      String hostname = emaModel.getHostname();
+
+      TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis),
+        startTime);
+
+      if (tukeysData.getMetrics().isEmpty()) {
+        LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey);
+        continue;
+      }
+
+      List<Double> trainTsList = new ArrayList<>();
+      List<Double> trainDataList = new ArrayList<>();
+      List<Double> testTsList = new ArrayList<>();
+      List<Double> testDataList = new ArrayList<>();
+
+      for (TimelineMetric metric : tukeysData.getMetrics()) {
+        for (Long timestamp : metric.getMetricValues().keySet()) {
+          if (timestamp <= (startTime - testIntervalMillis)) {
+            trainDataList.add(metric.getMetricValues().get(timestamp));
+            trainTsList.add((double)timestamp);
+          } else {
+            testDataList.add(metric.getMetricValues().get(timestamp));
+            testTsList.add((double)timestamp);
+          }
+        }
+      }
+
+      if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+        LOG.info("Not enough train/test data to perform analysis.");
+        continue;
+      }
+
+      String tukeysTrainSeries = "tukeysTrainSeries";
+      double[] trainTs = new double[trainTsList.size()];
+      double[] trainData = new double[trainTsList.size()];
+      for (int i = 0; i < trainTs.length; i++) {
+        trainTs[i] = trainTsList.get(i);
+        trainData[i] = trainDataList.get(i);
+      }
+
+      String tukeysTestSeries = "tukeysTestSeries";
+      double[] testTs = new double[testTsList.size()];
+      double[] testData = new double[testTsList.size()];
+      for (int i = 0; i < testTs.length; i++) {
+        testTs[i] = testTsList.get(i);
+        testData[i] = testDataList.get(i);
+      }
+
+      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+      DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData);
+      DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData);
+
+      if (!tukeysNMap.containsKey(metricKey)) {
+        tukeysNMap.put(metricKey, defaultTukeysN);
+      }
+
+      Map<String, String> configs = new HashMap<>();
+      configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey)));
+
+      ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs);
+
+      List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname);
+      LOG.info("Tukeys anomalies size : " + tukeysMetrics.size());
+      TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>();
+
+      for (TimelineMetric tukeysMetric : tukeysMetrics) {
+        tukeysMetricValues.putAll(tukeysMetric.getMetricValues());
+        timelineMetrics.addOrMergeTimelineMetric(tukeysMetric);
+      }
+
+      TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime);
+      TreeMap<Long, Double> emaMetricValues = new TreeMap();
+      if (!emaData.getMetrics().isEmpty()) {
+        emaMetricValues = emaData.getMetrics().get(0).getMetricValues();
+      }
+
+      LOG.info("Ema anomalies size : " + emaMetricValues.size());
+      int tp = 0;
+      int tn = 0;
+      int fp = 0;
+      int fn = 0;
+
+      for (double ts : testTs) {
+        long timestamp = (long) ts;
+        if (tukeysMetricValues.containsKey(timestamp)) {
+          if (emaMetricValues.containsKey(timestamp)) {
+            tp++;
+          } else {
+            fn++;
+          }
+        } else {
+          if (emaMetricValues.containsKey(timestamp)) {
+            fp++;
+          } else {
+            tn++;
+          }
+        }
+      }
+
+      double recall = (double) tp / (double) (tp + fn);
+      double precision = (double) tp / (double) (tp + fp);
+      LOG.info("----------------------------");
+      LOG.info("Precision Recall values for " + metricKey);
+      LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn);
+      LOG.info("----------------------------");
+
+      if (recall < 0.5) {
+        LOG.info("Increasing EMA sensitivity by 10%");
+        emaModel.updateModel(true, 10);
+      } else if (precision < 0.5) {
+        LOG.info("Decreasing EMA sensitivity by 10%");
+        emaModel.updateModel(false, 10);
+      }
+
+    }
+
+    if (emaTechnique.getTrackedEmas().isEmpty()){
+      LOG.info("No EMA Technique keys tracked!!!!");
+    }
+
+    if (!timelineMetrics.getMetrics().isEmpty()) {
+      metricsCollectorInterface.emitMetrics(timelineMetrics);
+    }
+  }
+
+  private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) {
+
+    List<TimelineMetric> timelineMetrics = new ArrayList<>();
+
+    if (result == null) {
+      LOG.info("ResultSet from R call is null!!");
+      return null;
+    }
+
+    if (result.resultset.size() > 0) {
+      double[] ts = result.resultset.get(0);
+      double[] metrics = result.resultset.get(1);
+      double[] anomalyScore = result.resultset.get(2);
+      for (int i = 0; i < ts.length; i++) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metricName + "_" + appId + "_" + hostname);
+        timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+        timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
+        timelineMetric.setInstanceId(null);
+        timelineMetric.setStartTime((long) ts[i]);
+        TreeMap<Long, Double> metricValues = new TreeMap<>();
+        metricValues.put((long) ts[i], metrics[i]);
+
+        HashMap<String, String> metadata = new HashMap<>();
+        metadata.put("method", "tukeys");
+        metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+        timelineMetric.setMetadata(metadata);
+
+        timelineMetric.setMetricValues(metricValues);
+        timelineMetrics.add(timelineMetric);
+      }
+    }
+
+    return timelineMetrics;
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java
new file mode 100644
index 0000000..4fdf27d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RFunctionInvoker {
+
+  static final Log LOG = LogFactory.getLog(RFunctionInvoker.class);
+  public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+  private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts";
+
+  private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) {
+    r.assign("train_ts", trainData.ts);
+    r.assign("train_x", trainData.values);
+    r.eval("train_data <- data.frame(train_ts,train_x)");
+    r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")");
+
+    r.assign("test_ts", testData.ts);
+    r.assign("test_x", testData.values);
+    r.eval("test_data <- data.frame(test_ts,test_x)");
+    r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")");
+  }
+
+  public static void setScriptsDir(String dir) {
+    rScriptDir = dir;
+  }
+
+  public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+
+    ResultSet result;
+    switch (methodType) {
+      case "tukeys":
+        result = tukeys(trainData, testData, configs);
+        break;
+      case "ema":
+        result = ema_global(trainData, testData, configs);
+        break;
+      case "ks":
+        result = ksTest(trainData, testData, configs);
+        break;
+      case "hsdev":
+        result = hsdev(trainData, testData, configs);
+        break;
+      default:
+        result = tukeys(trainData, testData, configs);
+        break;
+    }
+    return result;
+  }
+
+  public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+
+      REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')");
+
+      double n = Double.parseDouble(configs.get("tukeys.n"));
+      r.eval("n <- " + n);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ams_tukeys(train_data, test_data, n)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+      int n = Integer.parseInt(configs.get("ema.n"));
+      r.eval("n <- " + n);
+
+      double w = Double.parseDouble(configs.get("ema.w"));
+      r.eval("w <- " + w);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ema_global(train_data, test_data, w, n)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+      int n = Integer.parseInt(configs.get("ema.n"));
+      r.eval("n <- " + n);
+
+      double w = Double.parseDouble(configs.get("ema.w"));
+      r.eval("w <- " + w);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ema_daily(train_data, test_data, w, n)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/kstest.r" + "')");
+
+      double p_value = Double.parseDouble(configs.get("ks.p_value"));
+      r.eval("p_value <- " + p_value);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ams_ks(train_data, test_data, p_value)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/hsdev.r" + "')");
+
+      int n = Integer.parseInt(configs.get("hsdev.n"));
+      r.eval("n <- " + n);
+
+      int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
+      r.eval("nhp <- " + nhp);
+
+      long interval = Long.parseLong(configs.get("hsdev.interval"));
+      r.eval("interval <- " + interval);
+
+      long period = Long.parseLong(configs.get("hsdev.period"));
+      r.eval("period <- " + period);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
+      REXP exp = r.eval("an2");
+      RVector cont = (RVector) exp.getContent();
+
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java
new file mode 100644
index 0000000..7485f01
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collections;
+import java.util.Map;
+
+@XmlRootElement
+public class TestSeriesInputRequest {
+
+  private String seriesName;
+  private String seriesType;
+  private Map<String, String> configs;
+
+  public TestSeriesInputRequest() {
+  }
+
+  public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) {
+    this.seriesName = seriesName;
+    this.seriesType = seriesType;
+    this.configs = configs;
+  }
+
+  public String getSeriesName() {
+    return seriesName;
+  }
+
+  public void setSeriesName(String seriesName) {
+    this.seriesName = seriesName;
+  }
+
+  public String getSeriesType() {
+    return seriesType;
+  }
+
+  public void setSeriesType(String seriesType) {
+    this.seriesType = seriesType;
+  }
+
+  public Map<String, String> getConfigs() {
+    return configs;
+  }
+
+  public void setConfigs(Map<String, String> configs) {
+    this.configs = configs;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o;
+    return anotherInput.getSeriesName().equals(this.getSeriesName());
+  }
+
+  @Override
+  public int hashCode() {
+    return seriesName.hashCode();
+  }
+
+  public static void main(String[] args) {
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value"));
+    try {
+      System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest));
+    } catch (JsonProcessingException e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
new file mode 100644
index 0000000..1534b55
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.hsdev.HsdevTechnique;
+import org.apache.ambari.metrics.alertservice.prototype.methods.kstest.KSTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TrendADSystem implements Serializable {
+
+  private MetricsCollectorInterface metricsCollectorInterface;
+  private List<TrendMetric> trendMetrics;
+
+  private long ksTestIntervalMillis = 10 * 60 * 1000;
+  private long ksTrainIntervalMillis = 10 * 60 * 1000;
+  private KSTechnique ksTechnique;
+
+  private HsdevTechnique hsdevTechnique;
+  private int hsdevNumHistoricalPeriods = 3;
+
+  private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>();
+  private static final Log LOG = LogFactory.getLog(TrendADSystem.class);
+  private String inputFile = "";
+
+  public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
+                       long ksTestIntervalMillis,
+                       long ksTrainIntervalMillis,
+                       int hsdevNumHistoricalPeriods,
+                       String inputFileName) {
+
+    this.metricsCollectorInterface = metricsCollectorInterface;
+    this.ksTestIntervalMillis = ksTestIntervalMillis;
+    this.ksTrainIntervalMillis = ksTrainIntervalMillis;
+    this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods;
+
+    this.ksTechnique = new KSTechnique();
+    this.hsdevTechnique = new HsdevTechnique();
+
+    trendMetrics = new ArrayList<>();
+    this.inputFile = inputFileName;
+    readInputFile(inputFileName);
+  }
+
+  public void runKSTest(long currentEndTime) {
+    readInputFile(inputFile);
+
+    long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
+    LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " +
+      new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis)
+      + " : " + new Date(ksTestIntervalStartTime) + "]");
+
+    for (TrendMetric metric : trendMetrics) {
+      String metricName = metric.metricName;
+      String appId = metric.appId;
+      String hostname = metric.hostname;
+      String key = metricName + "_" + appId + "_" + hostname;
+
+      TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis,
+        currentEndTime);
+
+      if (ksData.getMetrics().isEmpty()) {
+        LOG.info("No metrics fetched for KS, metricKey = " + key);
+        continue;
+      }
+
+      List<Double> trainTsList = new ArrayList<>();
+      List<Double> trainDataList = new ArrayList<>();
+      List<Double> testTsList = new ArrayList<>();
+      List<Double> testDataList = new ArrayList<>();
+
+      for (TimelineMetric timelineMetric : ksData.getMetrics()) {
+        for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+          if (timestamp <= ksTestIntervalStartTime) {
+            trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            trainTsList.add((double) timestamp);
+          } else {
+            testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            testTsList.add((double) timestamp);
+          }
+        }
+      }
+
+      if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+        LOG.info("Not enough train/test data to perform KS analysis.");
+        continue;
+      }
+
+      String ksTrainSeries = "KSTrainSeries";
+      double[] trainTs = new double[trainTsList.size()];
+      double[] trainData = new double[trainTsList.size()];
+      for (int i = 0; i < trainTs.length; i++) {
+        trainTs[i] = trainTsList.get(i);
+        trainData[i] = trainDataList.get(i);
+      }
+
+      String ksTestSeries = "KSTestSeries";
+      double[] testTs = new double[testTsList.size()];
+      double[] testData = new double[testTsList.size()];
+      for (int i = 0; i < testTs.length; i++) {
+        testTs[i] = testTsList.get(i);
+        testData[i] = testDataList.get(i);
+      }
+
+      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+      DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData);
+      DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData);
+
+      MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData);
+      if (metricAnomaly == null) {
+        LOG.info("No anomaly from KS test.");
+      } else {
+        LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric....");
+        TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly,
+          ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        metricsCollectorInterface.emitMetrics(timelineMetrics);
+
+        trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly);
+      }
+    }
+
+    if (trendMetrics.isEmpty()) {
+      LOG.info("No Trend metrics tracked!!!!");
+    }
+
+  }
+
+  private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly,
+                                   long testStart,
+                                   long testEnd,
+                                   long trainStart,
+                                   long trainEnd) {
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(metricAnomaly.getMetricKey());
+    timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType());
+    timelineMetric.setInstanceId(null);
+    timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+    timelineMetric.setStartTime(testEnd);
+    HashMap<String, String> metadata = new HashMap<>();
+    metadata.put("method", metricAnomaly.getMethodType());
+    metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore()));
+    metadata.put("test-start-time", String.valueOf(testStart));
+    metadata.put("train-start-time", String.valueOf(trainStart));
+    metadata.put("train-end-time", String.valueOf(trainEnd));
+    timelineMetric.setMetadata(metadata);
+    TreeMap<Long,Double> metricValues = new TreeMap<>();
+    metricValues.put(testEnd, metricAnomaly.getMetricValue());
+    timelineMetric.setMetricValues(metricValues);
+    return timelineMetric;
+
+  }
+  public void runHsdevMethod() {
+
+    List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
+
+    for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) {
+
+      long hsdevTestEnd = ksSingleRunKey.endTime;
+      long hsdevTestStart = ksSingleRunKey.startTime;
+
+      long period = hsdevTestEnd - hsdevTestStart;
+
+      long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period;
+      long hsdevTrainEnd = hsdevTestStart;
+
+      LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " +
+        new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart)
+        + " : " + new Date(hsdevTrainEnd) + "]");
+
+      String metricName = ksSingleRunKey.metricName;
+      String appId = ksSingleRunKey.appId;
+      String hostname = ksSingleRunKey.hostname;
+      String key = metricName + "_" + appId + "_" + hostname;
+
+      TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics(
+        metricName,
+        appId,
+        hostname,
+        hsdevTrainStart,
+        hsdevTestEnd);
+
+      if (hsdevData.getMetrics().isEmpty()) {
+        LOG.info("No metrics fetched for HSDev, metricKey = " + key);
+        continue;
+      }
+
+      List<Double> trainTsList = new ArrayList<>();
+      List<Double> trainDataList = new ArrayList<>();
+      List<Double> testTsList = new ArrayList<>();
+      List<Double> testDataList = new ArrayList<>();
+
+      for (TimelineMetric timelineMetric : hsdevData.getMetrics()) {
+        for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+          if (timestamp <= hsdevTestStart) {
+            trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            trainTsList.add((double) timestamp);
+          } else {
+            testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            testTsList.add((double) timestamp);
+          }
+        }
+      }
+
+      if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+        LOG.info("Not enough train/test data to perform Hsdev analysis.");
+        continue;
+      }
+
+      String hsdevTrainSeries = "HsdevTrainSeries";
+      double[] trainTs = new double[trainTsList.size()];
+      double[] trainData = new double[trainTsList.size()];
+      for (int i = 0; i < trainTs.length; i++) {
+        trainTs[i] = trainTsList.get(i);
+        trainData[i] = trainDataList.get(i);
+      }
+
+      String hsdevTestSeries = "HsdevTestSeries";
+      double[] testTs = new double[testTsList.size()];
+      double[] testData = new double[testTsList.size()];
+      for (int i = 0; i < testTs.length; i++) {
+        testTs[i] = testTsList.get(i);
+        testData[i] = testDataList.get(i);
+      }
+
+      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+      DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData);
+      DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData);
+
+      MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData);
+      if (metricAnomaly == null) {
+        LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. ");
+        ksTechnique.updateModel(key, false, 10);
+      } else {
+        LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly.");
+        hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly,
+          hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd));
+      }
+    }
+    clearTrackedKsRunKeys();
+
+    if (!hsdevMetricAnomalies.isEmpty()) {
+      LOG.info("Publishing Hsdev Anomalies....");
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(hsdevMetricAnomalies);
+      metricsCollectorInterface.emitMetrics(timelineMetrics);
+    }
+  }
+
+  private void clearTrackedKsRunKeys() {
+    trackedKsAnomalies.clear();
+  }
+
+  private void readInputFile(String fileName) {
+    trendMetrics.clear();
+    try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
+      for (String line; (line = br.readLine()) != null; ) {
+        String[] splits = line.split(",");
+        LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
+        trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
+      }
+    } catch (IOException e) {
+      LOG.error("Error reading input file : " + e);
+    }
+  }
+
+  class KsSingleRunKey implements Serializable{
+
+    long startTime;
+    long endTime;
+    String metricName;
+    String appId;
+    String hostname;
+
+    public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+      this.metricName = metricName;
+      this.appId = appId;
+      this.hostname = hostname;
+    }
+  }
+
+  /*
+          boolean isPresent = false;
+        for (TrendMetric trendMetric : trendMetrics) {
+          if (trendMetric.metricName.equalsIgnoreCase(splits[0])) {
+            isPresent = true;
+          }
+        }
+        if (!isPresent) {
+          LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
+          trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
+        }
+   */
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java
similarity index 68%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java
index af33d26..3bead8b 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/MetricAnomalyModel.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java
@@ -15,15 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.metrics.alertservice.methods;
+package org.apache.ambari.metrics.alertservice.prototype;
 
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
+import java.io.Serializable;
 
-import java.util.List;
+public class TrendMetric implements Serializable {
 
-public interface MetricAnomalyModel {
+  String metricName;
+  String appId;
+  String hostname;
 
-    public List<MetricAnomaly> onNewMetric(TimelineMetric metric);
-    public List<MetricAnomaly> test(TimelineMetric metric);
+  public TrendMetric(String metricName, String appId, String hostname) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.hostname = hostname;
+  }
 }
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
similarity index 77%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
index a709c73..eb19857 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/DataSet.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
@@ -15,24 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.metrics.alertservice.common;
+package org.apache.ambari.metrics.alertservice.prototype.common;
 
 import java.util.Arrays;
 
-public class DataSet {
+public class DataSeries {
 
-    public String metricName;
+    public String seriesName;
     public double[] ts;
     public double[] values;
 
-    public DataSet(String metricName, double[] ts, double[] values) {
-        this.metricName = metricName;
+    public DataSeries(String seriesName, double[] ts, double[] values) {
+        this.seriesName = seriesName;
         this.ts = ts;
         this.values = values;
     }
 
     @Override
     public String toString() {
-        return metricName + Arrays.toString(ts) + Arrays.toString(values);
+        return seriesName + Arrays.toString(ts) + Arrays.toString(values);
     }
 }
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
similarity index 91%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
index 9415c1b..101b0e9 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/ResultSet.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.metrics.alertservice.common;
+package org.apache.ambari.metrics.alertservice.prototype.common;
 
 
 import java.util.ArrayList;
@@ -23,7 +23,7 @@ import java.util.List;
 
 public class ResultSet {
 
-    List<double[]> resultset = new ArrayList<>();
+    public List<double[]> resultset = new ArrayList<>();
 
     public ResultSet(List<double[]> resultset) {
         this.resultset = resultset;
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
similarity index 55%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
index 81bd77b..4ea4ac5 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/StatisticUtils.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
@@ -15,24 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.metrics.alertservice.common;
+package org.apache.ambari.metrics.alertservice.prototype.common;
 
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 
 public class StatisticUtils {
 
-  public static double mean(Collection<Double> values) {
+  public static double mean(double[] values) {
     double sum = 0;
     for (double d : values) {
       sum += d;
     }
-    return sum / values.size();
+    return sum / values.length;
   }
 
-  public static double variance(Collection<Double> values) {
+  public static double variance(double[] values) {
     double avg =  mean(values);
     double variance = 0;
     for (double d : values) {
@@ -41,37 +42,21 @@ public class StatisticUtils {
     return variance;
   }
 
-  public static double sdev(Collection<Double> values, boolean useBesselsCorrection) {
+  public static double sdev(double[]  values, boolean useBesselsCorrection) {
     double variance = variance(values);
-    int n = (useBesselsCorrection) ? values.size() - 1 : values.size();
+    int n = (useBesselsCorrection) ? values.length - 1 : values.length;
     return Math.sqrt(variance / n);
   }
 
-  public static double median(Collection<Double> values) {
-    ArrayList<Double> clonedValues = new ArrayList<Double>(values);
-    Collections.sort(clonedValues);
-    int n = values.size();
+  public static double median(double[] values) {
+    double[] clonedValues = Arrays.copyOf(values, values.length);
+    Arrays.sort(clonedValues);
+    int n = values.length;
 
     if (n % 2 != 0) {
-      return clonedValues.get((n-1)/2);
+      return clonedValues[(n-1)/2];
     } else {
-      return ( clonedValues.get((n-1)/2) + clonedValues.get(n/2) ) / 2;
+      return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2;
     }
   }
-
-
-
-//  public static void main(String[] args) {
-//
-//    Collection<Double> values = new ArrayList<>();
-//    values.add(1.0);
-//    values.add(2.0);
-//    values.add(3.0);
-//    values.add(4.0);
-//    values.add(5.0);
-//
-//    System.out.println(mean(values));
-//    System.out.println(sdev(values, false));
-//    System.out.println(median(values));
-//  }
 }
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
similarity index 62%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
index 2d24a9c..0b10b4b 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaResult.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
@@ -6,31 +6,27 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.metrics.alertservice.methods.ema;
+package org.apache.ambari.metrics.alertservice.prototype.methods;
 
-import org.apache.ambari.metrics.alertservice.common.MethodResult;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
-public class EmaResult extends MethodResult{
+import java.sql.Time;
+import java.util.List;
+import java.util.Map;
 
-    double diff;
+public abstract class AnomalyDetectionTechnique {
 
-    public EmaResult(double diff) {
-        this.methodType = "EMA";
-        this.diff = diff;
-    }
+  protected String methodType;
 
+  public abstract List<MetricAnomaly> test(TimelineMetric metric);
 
-    @Override
-    public String prettyPrint() {
-        return methodType + "(` = " + diff + ")";
-    }
 }
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
new file mode 100644
index 0000000..da4f030
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricAnomaly implements Serializable{
+
+  private String methodType;
+  private double anomalyScore;
+  private String metricKey;
+  private long timestamp;
+  private double metricValue;
+
+
+  public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) {
+    this.metricKey = metricKey;
+    this.timestamp = timestamp;
+    this.metricValue = metricValue;
+    this.methodType = methodType;
+    this.anomalyScore = anomalyScore;
+
+  }
+
+  public String getMethodType() {
+    return methodType;
+  }
+
+  public void setMethodType(String methodType) {
+    this.methodType = methodType;
+  }
+
+  public double getAnomalyScore() {
+    return anomalyScore;
+  }
+
+  public void setAnomalyScore(double anomalyScore) {
+    this.anomalyScore = anomalyScore;
+  }
+
+  public void setMetricKey(String metricKey) {
+    this.metricKey = metricKey;
+  }
+
+  public String getMetricKey() {
+    return metricKey;
+  }
+
+  public void setMetricName(String metricName) {
+    this.metricKey = metricName;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public double getMetricValue() {
+    return metricValue;
+  }
+
+  public void setMetricValue(double metricValue) {
+    this.metricValue = metricValue;
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
new file mode 100644
index 0000000..5e1f76b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+@XmlRootElement
+public class EmaModel implements Serializable {
+
+  private String metricName;
+  private String hostname;
+  private String appId;
+  private double ema;
+  private double ems;
+  private double weight;
+  private double timessdev;
+
+  private int ctr = 0;
+  private static final int suppressAnomaliesTheshold = 30;
+
+  private static final Log LOG = LogFactory.getLog(EmaModel.class);
+
+  public EmaModel(String name, String hostname, String appId, double weight, double timessdev) {
+    this.metricName = name;
+    this.hostname = hostname;
+    this.appId = appId;
+    this.weight = weight;
+    this.timessdev = timessdev;
+    this.ema = 0.0;
+    this.ems = 0.0;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public double testAndUpdate(double metricValue) {
+
+    double anomalyScore = 0.0;
+    if (ctr > suppressAnomaliesTheshold) {
+      anomalyScore = test(metricValue);
+    }
+    if (Math.abs(anomalyScore) < 2 * timessdev) {
+      update(metricValue);
+    } else {
+      LOG.info("Not updating model for this value");
+    }
+    ctr++;
+    LOG.info("Counter : " + ctr);
+    LOG.info("Anomaly Score for " + metricValue + " : " + anomalyScore);
+    return anomalyScore;
+  }
+
+  public void update(double metricValue) {
+    ema = weight * ema + (1 - weight) * metricValue;
+    ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
+    LOG.info("In update : ema = " + ema + ", ems = " + ems);
+  }
+
+  public double test(double metricValue) {
+    LOG.info("In test : ema = " + ema + ", ems = " + ems);
+    double diff = Math.abs(ema - metricValue) - (timessdev * ems);
+    LOG.info("diff = " + diff);
+    if (diff > 0) {
+      return Math.abs((metricValue - ema) / ems); //Z score
+    } else {
+      return 0.0;
+    }
+  }
+
+  public void updateModel(boolean increaseSensitivity, double percent) {
+    LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent);
+    double delta = percent / 100;
+    if (increaseSensitivity) {
+      delta = delta * -1;
+    }
+    this.timessdev = timessdev + delta * timessdev;
+    this.weight = Math.min(1.0, weight + delta * weight);
+    LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight);
+  }
+
+  public double getWeight() {
+    return weight;
+  }
+
+  public void setWeight(double weight) {
+    this.weight = weight;
+  }
+
+  public double getTimessdev() {
+    return timessdev;
+  }
+
+  public void setTimessdev(double timessdev) {
+    this.timessdev = timessdev;
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
similarity index 64%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
index 0205844..62749c1 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/methods/ema/EmaModelLoader.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
@@ -15,32 +15,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.metrics.alertservice.methods.ema;
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
 
 import com.google.gson.Gson;
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.spark.SparkContext;
 import org.apache.spark.mllib.util.Loader;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 
-public class EmaModelLoader implements Loader<EmaModel> {
+public class EmaModelLoader implements Loader<EmaTechnique> {
     private static final Log LOG = LogFactory.getLog(EmaModelLoader.class);
 
     @Override
-    public EmaModel load(SparkContext sc, String path) {
-        Gson gson = new Gson();
-        try {
-            String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
-            return gson.fromJson(fileString, EmaModel.class);
-        } catch (IOException e) {
-            LOG.error(e);
-        }
-        return null;
+    public EmaTechnique load(SparkContext sc, String path) {
+        return new EmaTechnique(0.5,3);
+//        Gson gson = new Gson();
+//        try {
+//            String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+//            return gson.fromJson(fileString, EmaTechnique.class);
+//        } catch (IOException e) {
+//            LOG.error(e);
+//        }
+//        return null;
     }
 }
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
new file mode 100644
index 0000000..c005e6f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.AnomalyDetectionTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Saveable;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable {
+
+  @XmlElement(name = "trackedEmas")
+  private Map<String, EmaModel> trackedEmas;
+  private static final Log LOG = LogFactory.getLog(EmaTechnique.class);
+
+  private double startingWeight = 0.5;
+  private double startTimesSdev = 3.0;
+  private String methodType = "ema";
+
+  public EmaTechnique(double startingWeight, double startTimesSdev) {
+    trackedEmas = new HashMap<>();
+    this.startingWeight = startingWeight;
+    this.startTimesSdev = startTimesSdev;
+    LOG.info("New EmaTechnique......");
+  }
+
+  public List<MetricAnomaly> test(TimelineMetric metric) {
+    String metricName = metric.getMetricName();
+    String appId = metric.getAppId();
+    String hostname = metric.getHostName();
+    String key = metricName + "_" + appId + "_" + hostname;
+
+    EmaModel emaModel = trackedEmas.get(key);
+    if (emaModel == null) {
+      LOG.info("EmaModel not present for " + key);
+      LOG.info("Number of tracked Emas : " + trackedEmas.size());
+      emaModel  = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev);
+      trackedEmas.put(key, emaModel);
+    } else {
+      LOG.info("EmaModel already present for " + key);
+    }
+
+    List<MetricAnomaly> anomalies = new ArrayList<>();
+
+    for (Long timestamp : metric.getMetricValues().keySet()) {
+      double metricValue = metric.getMetricValues().get(timestamp);
+      double anomalyScore = emaModel.testAndUpdate(metricValue);
+      if (anomalyScore > 0.0) {
+        LOG.info("Found anomaly for : " + key);
+        MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore);
+        anomalies.add(metricAnomaly);
+      } else {
+        LOG.info("Discarding non-anomaly for : " + key);
+      }
+    }
+    return anomalies;
+  }
+
+  public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) {
+    String metricName = timelineMetric.getMetricName();
+    String appId = timelineMetric.getAppId();
+    String hostname = timelineMetric.getHostName();
+    String key = metricName + "_" + appId + "_" + hostname;
+
+
+    EmaModel emaModel = trackedEmas.get(key);
+
+    if (emaModel == null) {
+      LOG.warn("EMA Model for " + key + " not found");
+      return false;
+    }
+    emaModel.updateModel(increaseSensitivity, percent);
+
+    return true;
+  }
+
+  @Override
+  public void save(SparkContext sc, String path) {
+    Gson gson = new Gson();
+    try {
+      String json = gson.toJson(this);
+      try (Writer writer = new BufferedWriter(new OutputStreamWriter(
+        new FileOutputStream(path), "utf-8"))) {
+        writer.write(json);
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  @Override
+  public String formatVersion() {
+    return "1.0";
+  }
+
+  public Map<String, EmaModel> getTrackedEmas() {
+    return trackedEmas;
+  }
+
+  public double getStartingWeight() {
+    return startingWeight;
+  }
+
+  public double getStartTimesSdev() {
+    return startTimesSdev;
+  }
+
+}
+
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
new file mode 100644
index 0000000..50bf9f2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.hsdev;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.median;
+import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.sdev;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HsdevTechnique implements Serializable {
+
+  private Map<String, Double> hsdevMap;
+  private String methodType = "hsdev";
+  private static final Log LOG = LogFactory.getLog(HsdevTechnique.class);
+
+  public HsdevTechnique() {
+    hsdevMap = new HashMap<>();
+  }
+
+  public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) {
+    int testLength = testData.values.length;
+    int trainLength = trainData.values.length;
+
+    if (trainLength < testLength) {
+      LOG.info("Not enough train data.");
+      return null;
+    }
+
+    if (!hsdevMap.containsKey(key)) {
+      hsdevMap.put(key, 3.0);
+    }
+
+    double n = hsdevMap.get(key);
+
+    double historicSd = sdev(trainData.values, false);
+    double historicMedian = median(trainData.values);
+    double currentMedian = median(testData.values);
+
+    double diff = Math.abs(currentMedian - historicMedian);
+    LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
+    LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
+
+    if (diff > n * historicSd) {
+      double zScore = diff / historicSd;
+      LOG.info("Z Score of current series : " + zScore);
+      return new MetricAnomaly(key,
+        (long) testData.ts[testLength - 1],
+        testData.values[testLength - 1],
+        methodType,
+        zScore);
+    }
+    return null;
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
new file mode 100644
index 0000000..ff8dbcf
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype.methods.kstest;
+
+import org.apache.ambari.metrics.alertservice.prototype.RFunctionInvoker;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KSTechnique implements Serializable {
+
+  private String methodType = "ks";
+  private Map<String, Double> pValueMap;
+  private static final Log LOG = LogFactory.getLog(KSTechnique.class);
+
+  public KSTechnique() {
+    pValueMap = new HashMap();
+  }
+
+  public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries testData) {
+
+    int testLength = testData.values.length;
+    int trainLength = trainData.values.length;
+
+    if (trainLength < testLength) {
+      LOG.info("Not enough train data.");
+      return null;
+    }
+
+    if (!pValueMap.containsKey(key)) {
+      pValueMap.put(key, 0.05);
+    }
+    double pValue = pValueMap.get(key);
+
+    ResultSet result = RFunctionInvoker.ksTest(trainData, testData, Collections.singletonMap("ks.p_value", String.valueOf(pValue)));
+    if (result == null) {
+      LOG.error("Resultset is null when invoking KS R function...");
+      return null;
+    }
+
+    if (result.resultset.size() > 0) {
+
+      LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length);
+      LOG.info("p_value = " + result.resultset.get(3)[0]);
+      double dValue = result.resultset.get(2)[0];
+
+      return new MetricAnomaly(key,
+        (long) testData.ts[testLength - 1],
+        testData.values[testLength - 1],
+        methodType,
+        dValue);
+    }
+
+    return null;
+  }
+
+  public void updateModel(String metricKey, boolean increaseSensitivity, double percent) {
+
+    LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent);
+
+    if (!pValueMap.containsKey(metricKey)) {
+      LOG.error("Unknown metric key : " + metricKey);
+      LOG.info("pValueMap :" + pValueMap.toString());
+      return;
+    }
+
+    double delta = percent / 100;
+    if (!increaseSensitivity) {
+      delta = delta * -1;
+    }
+
+    double pValue = pValueMap.get(metricKey);
+    double newPValue = Math.min(1.0, pValue + delta * pValue);
+    pValueMap.put(metricKey, newPValue);
+    LOG.info("New pValue = " + newPValue);
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
similarity index 77%
rename from ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java
rename to ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
index 6bf58df..a8e31bf 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/common/MethodResult.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
@@ -15,13 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ambari.metrics.alertservice.common;
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
 
-public abstract class MethodResult {
-    protected String methodType;
-    public abstract String prettyPrint();
+public interface AbstractMetricSeries {
+
+  public double nextValue();
+  public double[] getSeries(int n);
 
-    public String getMethodType() {
-        return methodType;
-    }
 }
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
new file mode 100644
index 0000000..4158ff4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class DualBandMetricSeries implements AbstractMetricSeries {
+
+  double lowBandValue = 0.0;
+  double lowBandDeviationPercentage = 0.0;
+  int lowBandPeriodSize = 10;
+  double highBandValue = 1.0;
+  double highBandDeviationPercentage = 0.0;
+  int highBandPeriodSize = 10;
+
+  Random random = new Random();
+  double lowBandValueLowerLimit, lowBandValueHigherLimit;
+  double highBandLowerLimit, highBandUpperLimit;
+  int l = 0, h = 0;
+
+  public DualBandMetricSeries(double lowBandValue,
+                              double lowBandDeviationPercentage,
+                              int lowBandPeriodSize,
+                              double highBandValue,
+                              double highBandDeviationPercentage,
+                              int highBandPeriodSize) {
+    this.lowBandValue = lowBandValue;
+    this.lowBandDeviationPercentage = lowBandDeviationPercentage;
+    this.lowBandPeriodSize = lowBandPeriodSize;
+    this.highBandValue = highBandValue;
+    this.highBandDeviationPercentage = highBandDeviationPercentage;
+    this.highBandPeriodSize = highBandPeriodSize;
+    init();
+  }
+
+  private void init() {
+    lowBandValueLowerLimit = lowBandValue - lowBandDeviationPercentage * lowBandValue;
+    lowBandValueHigherLimit = lowBandValue + lowBandDeviationPercentage * lowBandValue;
+    highBandLowerLimit = highBandValue - highBandDeviationPercentage * highBandValue;
+    highBandUpperLimit = highBandValue + highBandDeviationPercentage * highBandValue;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value = 0.0;
+
+    if (l < lowBandPeriodSize) {
+      value = lowBandValueLowerLimit + (lowBandValueHigherLimit - lowBandValueLowerLimit) * random.nextDouble();
+      l++;
+    } else if (h < highBandPeriodSize) {
+      value = highBandLowerLimit + (highBandUpperLimit - highBandLowerLimit) * random.nextDouble();
+      h++;
+    }
+
+    if (l == lowBandPeriodSize && h == highBandPeriodSize) {
+      l = 0;
+      h = 0;
+    }
+
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
new file mode 100644
index 0000000..1e37ff3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+public class MetricSeriesGeneratorFactory {
+
+  /**
+   * Return a normally distributed data series with some deviation % and outliers.
+   *
+   * @param n                                size of the data series
+   * @param value                            The value around which the uniform data series is centered on.
+   * @param deviationPercentage              The allowed deviation % on either side of the uniform value. For example, if value = 10, and deviation % is 0.1, the series values lie between 0.9 to 1.1.
+   * @param outlierProbability               The probability of finding an outlier in the series.
+   * @param outlierDeviationLowerPercentage  min percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and  13.
+   * @param outlierDeviationHigherPercentage max percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and  16.
+   * @param outliersAboveValue               Outlier should be greater or smaller than the value.
+   * @return uniform series
+   */
+  public static double[] createUniformSeries(int n,
+                                             double value,
+                                             double deviationPercentage,
+                                             double outlierProbability,
+                                             double outlierDeviationLowerPercentage,
+                                             double outlierDeviationHigherPercentage,
+                                             boolean outliersAboveValue) {
+
+    UniformMetricSeries metricSeries = new UniformMetricSeries(value,
+      deviationPercentage,
+      outlierProbability,
+      outlierDeviationLowerPercentage,
+      outlierDeviationHigherPercentage,
+      outliersAboveValue);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  /**
+   * /**
+   * Returns a normally distributed series.
+   *
+   * @param n                             size of the data series
+   * @param mean                          mean of the distribution
+   * @param sd                            sd of the distribution
+   * @param outlierProbability            sd of the distribution
+   * @param outlierDeviationSDTimesLower  Lower Limit of the outlier with respect to times sdev from the mean.
+   * @param outlierDeviationSDTimesHigher Higher Limit of the outlier with respect to times sdev from the mean.
+   * @param outlierOnRightEnd             Outlier should be on the right end or the left end.
+   * @return normal series
+   */
+  public static double[] createNormalSeries(int n,
+                                            double mean,
+                                            double sd,
+                                            double outlierProbability,
+                                            double outlierDeviationSDTimesLower,
+                                            double outlierDeviationSDTimesHigher,
+                                            boolean outlierOnRightEnd) {
+
+
+    NormalMetricSeries metricSeries = new NormalMetricSeries(mean,
+      sd,
+      outlierProbability,
+      outlierDeviationSDTimesLower,
+      outlierDeviationSDTimesHigher,
+      outlierOnRightEnd);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  /**
+   * Returns a monotonically increasing / decreasing series
+   *
+   * @param n                                size of the data series
+   * @param startValue                       Start value of the monotonic sequence
+   * @param slope                            direction of monotonicity m > 0 for increasing and m < 0 for decreasing.
+   * @param deviationPercentage              The allowed deviation % on either side of the current 'y' value. For example, if current value = 10 according to slope, and deviation % is 0.1, the series values lie between 0.9 to 1.1.
+   * @param outlierProbability               The probability of finding an outlier in the series.
+   * @param outlierDeviationLowerPercentage  min percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and  13.
+   * @param outlierDeviationHigherPercentage max percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and  16.
+   * @param outliersAboveValue               Outlier should be greater or smaller than the 'y' value.
+   * @return
+   */
+  public static double[] createMonotonicSeries(int n,
+                                               double startValue,
+                                               double slope,
+                                               double deviationPercentage,
+                                               double outlierProbability,
+                                               double outlierDeviationLowerPercentage,
+                                               double outlierDeviationHigherPercentage,
+                                               boolean outliersAboveValue) {
+
+    MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(startValue,
+      slope,
+      deviationPercentage,
+      outlierProbability,
+      outlierDeviationLowerPercentage,
+      outlierDeviationHigherPercentage,
+      outliersAboveValue);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  /**
+   * Returns a dual band series (lower and higher)
+   *
+   * @param n                           size of the data series
+   * @param lowBandValue                lower band value
+   * @param lowBandDeviationPercentage  lower band deviation
+   * @param lowBandPeriodSize           lower band
+   * @param highBandValue               high band centre value
+   * @param highBandDeviationPercentage high band deviation.
+   * @param highBandPeriodSize          high band size
+   * @return
+   */
+  public static double[] getDualBandSeries(int n,
+                                           double lowBandValue,
+                                           double lowBandDeviationPercentage,
+                                           int lowBandPeriodSize,
+                                           double highBandValue,
+                                           double highBandDeviationPercentage,
+                                           int highBandPeriodSize) {
+
+    DualBandMetricSeries metricSeries  = new DualBandMetricSeries(lowBandValue,
+      lowBandDeviationPercentage,
+      lowBandPeriodSize,
+      highBandValue,
+      highBandDeviationPercentage,
+      highBandPeriodSize);
+
+    return metricSeries.getSeries(n);
+  }
+
+  /**
+   * Returns a step function series.
+   *
+   * @param n                              size of the data series
+   * @param startValue                     start steady value
+   * @param steadyValueDeviationPercentage required devation in the steady state value
+   * @param steadyPeriodSlope              direction of monotonicity m > 0 for increasing and m < 0 for decreasing, m = 0 no increase or decrease.
+   * @param steadyPeriodMinSize            min size for step period
+   * @param steadyPeriodMaxSize            max size for step period.
+   * @param stepChangePercentage           Increase / decrease in steady state to denote a step in terms of deviation percentage from the last value.
+   * @param upwardStep                     upward or downward step.
+   * @return
+   */
+  public static double[] getStepFunctionSeries(int n,
+                                               double startValue,
+                                               double steadyValueDeviationPercentage,
+                                               double steadyPeriodSlope,
+                                               int steadyPeriodMinSize,
+                                               int steadyPeriodMaxSize,
+                                               double stepChangePercentage,
+                                               boolean upwardStep) {
+
+    StepFunctionMetricSeries metricSeries = new StepFunctionMetricSeries(startValue,
+      steadyValueDeviationPercentage,
+      steadyPeriodSlope,
+      steadyPeriodMinSize,
+      steadyPeriodMaxSize,
+      stepChangePercentage,
+      upwardStep);
+
+    return metricSeries.getSeries(n);
+  }
+
+  /**
+   * Series with small period of turbulence and then back to steady.
+   *
+   * @param n                                        size of the data series
+   * @param steadyStateValue                         steady state center value
+   * @param steadyStateDeviationPercentage           steady state deviation in percentage
+   * @param turbulentPeriodDeviationLowerPercentage  turbulent state lower limit in terms of percentage from centre value.
+   * @param turbulentPeriodDeviationHigherPercentage turbulent state higher limit in terms of percentage from centre value.
+   * @param turbulentPeriodLength                    turbulent period length (number of points)
+   * @param turbulentStatePosition                   Where the turbulent state should be 0 - at the beginning, 1 - in the middle (25% - 50% of the series), 2 - at the end of the series.
+   * @return
+   */
+  public static double[] getSteadySeriesWithTurbulentPeriod(int n,
+                                                            double steadyStateValue,
+                                                            double steadyStateDeviationPercentage,
+                                                            double turbulentPeriodDeviationLowerPercentage,
+                                                            double turbulentPeriodDeviationHigherPercentage,
+                                                            int turbulentPeriodLength,
+                                                            int turbulentStatePosition
+  ) {
+
+
+    SteadyWithTurbulenceMetricSeries metricSeries = new SteadyWithTurbulenceMetricSeries(n,
+      steadyStateValue,
+      steadyStateDeviationPercentage,
+      turbulentPeriodDeviationLowerPercentage,
+      turbulentPeriodDeviationHigherPercentage,
+      turbulentPeriodLength,
+      turbulentStatePosition);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  public static double[] generateSeries(String type, int n, Map<String, String> configs) {
+
+    double[] series;
+    switch (type) {
+
+      case "normal":
+        series = createNormalSeries(n,
+          Double.parseDouble(configs.getOrDefault("mean", "0")),
+          Double.parseDouble(configs.getOrDefault("sd", "1")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true")));
+        break;
+
+      case "uniform":
+        series = createUniformSeries(n,
+          Double.parseDouble(configs.getOrDefault("value", "10")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+        break;
+
+      case "monotonic":
+        series = createMonotonicSeries(n,
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("slope", "0")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+        break;
+
+      case "dualband":
+        series = getDualBandSeries(n,
+          Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+          Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+          Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+          Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+        break;
+
+      case "step":
+        series = getStepFunctionSeries(n,
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+          Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+        break;
+
+      case "turbulence":
+        series = getSteadySeriesWithTurbulentPeriod(n,
+          Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+          Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")),
+          Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+          Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0")));
+        break;
+
+      default:
+        series = createNormalSeries(n,
+          0,
+          1,
+          0,
+          0,
+          0,
+          true);
+    }
+    return series;
+  }
+
+  public static AbstractMetricSeries generateSeries(String type, Map<String, String> configs) {
+
+    AbstractMetricSeries series;
+    switch (type) {
+
+      case "normal":
+        series = new NormalMetricSeries(Double.parseDouble(configs.getOrDefault("mean", "0")),
+          Double.parseDouble(configs.getOrDefault("sd", "1")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true")));
+        break;
+
+      case "uniform":
+        series = new UniformMetricSeries(
+          Double.parseDouble(configs.getOrDefault("value", "10")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+        break;
+
+      case "monotonic":
+        series = new MonotonicMetricSeries(
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("slope", "0")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+        break;
+
+      case "dualband":
+        series = new DualBandMetricSeries(
+          Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+          Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+          Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+          Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+        break;
+
+      case "step":
+        series = new StepFunctionMetricSeries(
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+          Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+        break;
+
+      case "turbulence":
+        series = new SteadyWithTurbulenceMetricSeries(
+          Integer.parseInt(configs.getOrDefault("approxSeriesLength", "100")),
+          Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+          Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")),
+          Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+          Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0")));
+        break;
+
+      default:
+        series = new NormalMetricSeries(0,
+          1,
+          0,
+          0,
+          0,
+          true);
+    }
+    return series;
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
new file mode 100644
index 0000000..a883d08
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class MonotonicMetricSeries implements AbstractMetricSeries {
+
+  double startValue = 0.0;
+  double slope = 0.5;
+  double deviationPercentage = 0.0;
+  double outlierProbability = 0.0;
+  double outlierDeviationLowerPercentage = 0.0;
+  double outlierDeviationHigherPercentage = 0.0;
+  boolean outliersAboveValue = true;
+
+  Random random = new Random();
+  double nonOutlierProbability;
+
+  // y = mx + c
+  double y;
+  double m;
+  double x;
+  double c;
+
+  public MonotonicMetricSeries(double startValue,
+                               double slope,
+                               double deviationPercentage,
+                               double outlierProbability,
+                               double outlierDeviationLowerPercentage,
+                               double outlierDeviationHigherPercentage,
+                               boolean outliersAboveValue) {
+    this.startValue = startValue;
+    this.slope = slope;
+    this.deviationPercentage = deviationPercentage;
+    this.outlierProbability = outlierProbability;
+    this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+    this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+    this.outliersAboveValue = outliersAboveValue;
+    init();
+  }
+
+  private void init() {
+    y = startValue;
+    m = slope;
+    x = 1;
+    c = y - (m * x);
+    nonOutlierProbability = 1.0 - outlierProbability;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+    double probability = random.nextDouble();
+
+    y = m * x + c;
+    if (probability <= nonOutlierProbability) {
+      double valueDeviationLowerLimit = y - deviationPercentage * y;
+      double valueDeviationHigherLimit = y + deviationPercentage * y;
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+    } else {
+      if (outliersAboveValue) {
+        double outlierLowerLimit = y + outlierDeviationLowerPercentage * y;
+        double outlierUpperLimit = y + outlierDeviationHigherPercentage * y;
+        value = outlierLowerLimit + (outlierUpperLimit - outlierLowerLimit) * random.nextDouble();
+      } else {
+        double outlierLowerLimit = y - outlierDeviationLowerPercentage * y;
+        double outlierUpperLimit = y - outlierDeviationHigherPercentage * y;
+        value = outlierUpperLimit + (outlierLowerLimit - outlierUpperLimit) * random.nextDouble();
+      }
+    }
+    x++;
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
new file mode 100644
index 0000000..cc83d2c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class NormalMetricSeries implements AbstractMetricSeries {
+
+  double mean = 0.0;
+  double sd = 1.0;
+  double outlierProbability = 0.0;
+  double outlierDeviationSDTimesLower = 0.0;
+  double outlierDeviationSDTimesHigher = 0.0;
+  boolean outlierOnRightEnd = true;
+
+  Random random = new Random();
+  double nonOutlierProbability;
+
+
+  public NormalMetricSeries(double mean,
+                            double sd,
+                            double outlierProbability,
+                            double outlierDeviationSDTimesLower,
+                            double outlierDeviationSDTimesHigher,
+                            boolean outlierOnRightEnd) {
+    this.mean = mean;
+    this.sd = sd;
+    this.outlierProbability = outlierProbability;
+    this.outlierDeviationSDTimesLower = outlierDeviationSDTimesLower;
+    this.outlierDeviationSDTimesHigher = outlierDeviationSDTimesHigher;
+    this.outlierOnRightEnd = outlierOnRightEnd;
+    init();
+  }
+
+  private void init() {
+    nonOutlierProbability = 1.0 - outlierProbability;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+    double probability = random.nextDouble();
+
+    if (probability <= nonOutlierProbability) {
+      value = random.nextGaussian() * sd + mean;
+    } else {
+      if (outlierOnRightEnd) {
+        value = mean + (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd;
+      } else {
+        value = mean - (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd;
+      }
+    }
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
new file mode 100644
index 0000000..c4ed3ba
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class SteadyWithTurbulenceMetricSeries implements AbstractMetricSeries {
+
+  double steadyStateValue = 0.0;
+  double steadyStateDeviationPercentage = 0.0;
+  double turbulentPeriodDeviationLowerPercentage = 0.3;
+  double turbulentPeriodDeviationHigherPercentage = 0.5;
+  int turbulentPeriodLength = 5;
+  int turbulentStatePosition = 1;
+  int approximateSeriesLength = 10;
+
+  Random random = new Random();
+  double valueDeviationLowerLimit;
+  double valueDeviationHigherLimit;
+  double tPeriodLowerLimit;
+  double tPeriodUpperLimit;
+  int tPeriodStartIndex = 0;
+  int index = 0;
+
+  public SteadyWithTurbulenceMetricSeries(int approximateSeriesLength,
+                                          double steadyStateValue,
+                                          double steadyStateDeviationPercentage,
+                                          double turbulentPeriodDeviationLowerPercentage,
+                                          double turbulentPeriodDeviationHigherPercentage,
+                                          int turbulentPeriodLength,
+                                          int turbulentStatePosition) {
+    this.approximateSeriesLength = approximateSeriesLength;
+    this.steadyStateValue = steadyStateValue;
+    this.steadyStateDeviationPercentage = steadyStateDeviationPercentage;
+    this.turbulentPeriodDeviationLowerPercentage = turbulentPeriodDeviationLowerPercentage;
+    this.turbulentPeriodDeviationHigherPercentage = turbulentPeriodDeviationHigherPercentage;
+    this.turbulentPeriodLength = turbulentPeriodLength;
+    this.turbulentStatePosition = turbulentStatePosition;
+    init();
+  }
+
+  private void init() {
+
+    if (turbulentStatePosition == 1) {
+      tPeriodStartIndex = (int) (0.25 * approximateSeriesLength + (0.25 * approximateSeriesLength * random.nextDouble()));
+    } else if (turbulentStatePosition == 2) {
+      tPeriodStartIndex = approximateSeriesLength - turbulentPeriodLength;
+    }
+
+    valueDeviationLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue;
+    valueDeviationHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue;
+
+    tPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+    tPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+
+    if (index >= tPeriodStartIndex && index <= (tPeriodStartIndex + turbulentPeriodLength)) {
+      value = tPeriodLowerLimit + (tPeriodUpperLimit - tPeriodLowerLimit) * random.nextDouble();
+    } else {
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+    }
+    index++;
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+
+    double[] series = new double[n];
+    int turbulentPeriodStartIndex = 0;
+
+    if (turbulentStatePosition == 1) {
+      turbulentPeriodStartIndex = (int) (0.25 * n + (0.25 * n * random.nextDouble()));
+    } else if (turbulentStatePosition == 2) {
+      turbulentPeriodStartIndex = n - turbulentPeriodLength;
+    }
+
+    double valueDevLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue;
+    double valueDevHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue;
+
+    double turbulentPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+    double turbulentPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+
+    for (int i = 0; i < n; i++) {
+      if (i >= turbulentPeriodStartIndex && i < (turbulentPeriodStartIndex + turbulentPeriodLength)) {
+        series[i] = turbulentPeriodLowerLimit + (turbulentPeriodUpperLimit - turbulentPeriodLowerLimit) * random.nextDouble();
+      } else {
+        series[i] = valueDevLowerLimit + (valueDevHigherLimit - valueDevLowerLimit) * random.nextDouble();
+      }
+    }
+
+    return series;
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
new file mode 100644
index 0000000..d5beb48
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class StepFunctionMetricSeries implements AbstractMetricSeries {
+
+  double startValue = 0.0;
+  double steadyValueDeviationPercentage = 0.0;
+  double steadyPeriodSlope = 0.5;
+  int steadyPeriodMinSize = 10;
+  int steadyPeriodMaxSize = 20;
+  double stepChangePercentage = 0.0;
+  boolean upwardStep = true;
+
+  Random random = new Random();
+
+  // y = mx + c
+  double y;
+  double m;
+  double x;
+  double c;
+  int currentStepSize;
+  int currentIndex;
+
+  public StepFunctionMetricSeries(double startValue,
+                                  double steadyValueDeviationPercentage,
+                                  double steadyPeriodSlope,
+                                  int steadyPeriodMinSize,
+                                  int steadyPeriodMaxSize,
+                                  double stepChangePercentage,
+                                  boolean upwardStep) {
+    this.startValue = startValue;
+    this.steadyValueDeviationPercentage = steadyValueDeviationPercentage;
+    this.steadyPeriodSlope = steadyPeriodSlope;
+    this.steadyPeriodMinSize = steadyPeriodMinSize;
+    this.steadyPeriodMaxSize = steadyPeriodMaxSize;
+    this.stepChangePercentage = stepChangePercentage;
+    this.upwardStep = upwardStep;
+    init();
+  }
+
+  private void init() {
+    y = startValue;
+    m = steadyPeriodSlope;
+    x = 1;
+    c = y - (m * x);
+
+    currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble());
+    currentIndex = 0;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value = 0.0;
+
+    if (currentIndex < currentStepSize) {
+      y = m * x + c;
+      double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y;
+      double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * y;
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+      x++;
+      currentIndex++;
+    }
+
+    if (currentIndex == currentStepSize) {
+      currentIndex = 0;
+      currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble());
+      if (upwardStep) {
+        y = y + stepChangePercentage * y;
+      } else {
+        y = y - stepChangePercentage * y;
+      }
+      x = 1;
+      c = y - (m * x);
+    }
+
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
new file mode 100644
index 0000000..a2b0eea
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class UniformMetricSeries implements AbstractMetricSeries {
+
+  double value = 0.0;
+  double deviationPercentage = 0.0;
+  double outlierProbability = 0.0;
+  double outlierDeviationLowerPercentage = 0.0;
+  double outlierDeviationHigherPercentage = 0.0;
+  boolean outliersAboveValue= true;
+
+  Random random = new Random();
+  double valueDeviationLowerLimit;
+  double valueDeviationHigherLimit;
+  double outlierLeftLowerLimit;
+  double outlierLeftHigherLimit;
+  double outlierRightLowerLimit;
+  double outlierRightUpperLimit;
+  double nonOutlierProbability;
+
+
+  public UniformMetricSeries(double value,
+                             double deviationPercentage,
+                             double outlierProbability,
+                             double outlierDeviationLowerPercentage,
+                             double outlierDeviationHigherPercentage,
+                             boolean outliersAboveValue) {
+    this.value = value;
+    this.deviationPercentage = deviationPercentage;
+    this.outlierProbability = outlierProbability;
+    this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+    this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+    this.outliersAboveValue = outliersAboveValue;
+    init();
+  }
+
+  private void init() {
+    valueDeviationLowerLimit = value - deviationPercentage * value;
+    valueDeviationHigherLimit = value + deviationPercentage * value;
+
+    outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value;
+    outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value;
+    outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value;
+    outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value;
+
+    nonOutlierProbability = 1.0 - outlierProbability;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+    double probability = random.nextDouble();
+
+    if (probability <= nonOutlierProbability) {
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+    } else {
+      if (!outliersAboveValue) {
+        value = outlierLeftLowerLimit + (outlierLeftHigherLimit - outlierLeftLowerLimit) * random.nextDouble();
+      } else {
+        value = outlierRightLowerLimit + (outlierRightUpperLimit - outlierRightLowerLimit) * random.nextDouble();
+      }
+    }
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
deleted file mode 100644
index d65790e..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.spark;
-
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.*;
-
-public class AnomalyMetricPublisher implements Serializable {
-
-    private String hostName = "UNKNOWN.example.com";
-    private String instanceId = null;
-    private String serviceName = "anomaly-engine";
-    private String collectorHost;
-    private String protocol;
-    private String port;
-    private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
-    private static final Log LOG = LogFactory.getLog(AnomalyMetricPublisher.class);
-    private static ObjectMapper mapper;
-
-    static {
-        mapper = new ObjectMapper();
-        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
-        mapper.setAnnotationIntrospector(introspector);
-        mapper.getSerializationConfig()
-                .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
-    }
-
-    public AnomalyMetricPublisher(String collectorHost, String protocol, String port) {
-        this.collectorHost = collectorHost;
-        this.protocol = protocol;
-        this.port = port;
-        this.hostName = getDefaultLocalHostName();
-    }
-
-    private String getDefaultLocalHostName() {
-        try {
-            return InetAddress.getLocalHost().getCanonicalHostName();
-        } catch (UnknownHostException e) {
-            LOG.info("Error getting host address");
-        }
-        return null;
-    }
-
-    public void publish(List<MetricAnomaly> metricAnomalies) {
-        LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
-        List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
-        if (!metricList.isEmpty()) {
-            TimelineMetrics timelineMetrics = new TimelineMetrics();
-            timelineMetrics.setMetrics(metricList);
-            emitMetrics(timelineMetrics);
-        }
-    }
-
-    private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
-        List<TimelineMetric> metrics = new ArrayList<>();
-
-        if (metricAnomalies.isEmpty()) {
-            return metrics;
-        }
-
-        long currentTime = System.currentTimeMillis();
-        MetricAnomaly prevAnomaly = metricAnomalies.get(0);
-
-        TimelineMetric timelineMetric = new TimelineMetric();
-        timelineMetric.setMetricName(prevAnomaly.getMetricKey() + "_" + prevAnomaly.getMethodResult().getMethodType());
-        timelineMetric.setAppId(serviceName);
-        timelineMetric.setInstanceId(instanceId);
-        timelineMetric.setHostName(hostName);
-        timelineMetric.setStartTime(currentTime);
-
-        TreeMap<Long,Double> metricValues = new TreeMap<>();
-        metricValues.put(prevAnomaly.getTimestamp(), prevAnomaly.getMetricValue());
-        MetricAnomaly currentAnomaly;
-
-        for (int i = 1; i < metricAnomalies.size(); i++) {
-            currentAnomaly = metricAnomalies.get(i);
-            if (currentAnomaly.getMetricKey().equals(prevAnomaly.getMetricKey())) {
-                metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
-            } else {
-                timelineMetric.setMetricValues(metricValues);
-                metrics.add(timelineMetric);
-
-                timelineMetric = new TimelineMetric();
-                timelineMetric.setMetricName(currentAnomaly.getMetricKey() + "_" + currentAnomaly.getMethodResult().getMethodType());
-                timelineMetric.setAppId(serviceName);
-                timelineMetric.setInstanceId(instanceId);
-                timelineMetric.setHostName(hostName);
-                timelineMetric.setStartTime(currentTime);
-                metricValues = new TreeMap<>();
-                metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue());
-                prevAnomaly = currentAnomaly;
-            }
-        }
-
-        timelineMetric.setMetricValues(metricValues);
-        metrics.add(timelineMetric);
-        return metrics;
-    }
-
-    private boolean emitMetrics(TimelineMetrics metrics) {
-        String connectUrl = constructTimelineMetricUri();
-        String jsonData = null;
-        LOG.info("EmitMetrics connectUrl = "  + connectUrl);
-        try {
-            jsonData = mapper.writeValueAsString(metrics);
-        } catch (IOException e) {
-            LOG.error("Unable to parse metrics", e);
-        }
-        if (jsonData != null) {
-            return emitMetricsJson(connectUrl, jsonData);
-        }
-        return false;
-    }
-
-    private HttpURLConnection getConnection(String spec) throws IOException {
-        return (HttpURLConnection) new URL(spec).openConnection();
-    }
-
-    private boolean emitMetricsJson(String connectUrl, String jsonData) {
-        int timeout = 10000;
-        HttpURLConnection connection = null;
-        try {
-            if (connectUrl == null) {
-                throw new IOException("Unknown URL. Unable to connect to metrics collector.");
-            }
-            connection = getConnection(connectUrl);
-
-            connection.setRequestMethod("POST");
-            connection.setRequestProperty("Content-Type", "application/json");
-            connection.setRequestProperty("Connection", "Keep-Alive");
-            connection.setConnectTimeout(timeout);
-            connection.setReadTimeout(timeout);
-            connection.setDoOutput(true);
-
-            if (jsonData != null) {
-                try (OutputStream os = connection.getOutputStream()) {
-                    os.write(jsonData.getBytes("UTF-8"));
-                }
-            }
-
-            int statusCode = connection.getResponseCode();
-
-            if (statusCode != 200) {
-                LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
-                        "statusCode = " + statusCode);
-            } else {
-                LOG.info("Metrics posted to Collector " + connectUrl);
-            }
-            return true;
-        } catch (IOException ioe) {
-            LOG.error(ioe.getMessage());
-        }
-        return false;
-    }
-
-    private String constructTimelineMetricUri() {
-        StringBuilder sb = new StringBuilder(protocol);
-        sb.append("://");
-        sb.append(collectorHost);
-        sb.append(":");
-        sb.append(port);
-        sb.append(WS_V1_TIMELINE_METRICS);
-        return sb.toString();
-    }
-}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
deleted file mode 100644
index 3989c67..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.spark;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
-import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel;
-import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel;
-import org.apache.ambari.metrics.alertservice.methods.ema.EmaModelLoader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import scala.Tuple2;
-
-import java.util.*;
-
-public class MetricAnomalyDetector {
-
-    private static final Log LOG = LogFactory.getLog(MetricAnomalyDetector.class);
-    private static String groupId = "ambari-metrics-group";
-    private static String topicName = "ambari-metrics-topic";
-    private static int numThreads = 1;
-
-    //private static String zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181";
-    //private static Map<String, String> kafkaParams = new HashMap<>();
-    //static {
-    //    kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "avijayan-ams-2.openstacklocal:6667");
-    //    kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-    //    kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonSerializer");
-    //    kafkaParams.put("metadata.broker.list", "avijayan-ams-2.openstacklocal:6667");
-    //}
-
-    public MetricAnomalyDetector() {
-    }
-
-    public static void main(String[] args) throws InterruptedException {
-
-
-        if (args.length < 6) {
-            System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
-            System.exit(1);
-        }
-
-        List<String> appIds = Arrays.asList(args[1].split(","));
-        String collectorHost = args[2];
-        String collectorPort = args[3];
-        String collectorProtocol = args[4];
-        String zkQuorum = args[5];
-
-        List<MetricAnomalyModel> anomalyDetectionModels = new ArrayList<>();
-        AnomalyMetricPublisher anomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort);
-
-        SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector");
-
-        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
-
-        for (String method : args[0].split(",")) {
-            if (method.equals("ema")) {
-                LOG.info("Model EMA requested.");
-                EmaModel emaModel = new EmaModelLoader().load(jssc.sparkContext().sc(), "/tmp/model/ema");
-                anomalyDetectionModels.add(emaModel);
-            }
-        }
-
-        JavaPairReceiverInputDStream<String, String> messages =
-                KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
-
-        //Convert JSON string to TimelineMetrics.
-        JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
-            @Override
-            public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
-                ObjectMapper mapper = new ObjectMapper();
-                TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
-                return metrics;
-            }
-        });
-
-        //Group TimelineMetric by AppId.
-        JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
-                timelineMetrics -> new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(),timelineMetrics)
-        );
-
-        appMetricStream.print();
-
-        //Filter AppIds that are not needed.
-        JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
-            @Override
-            public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
-                return appIds.contains(appMetricTuple._1);
-            }
-        });
-
-        filteredAppMetricStream.print();
-
-        filteredAppMetricStream.foreachRDD(rdd -> {
-            rdd.foreach(
-                    tuple2 -> {
-                        TimelineMetrics metrics = tuple2._2();
-                        for (TimelineMetric metric : metrics.getMetrics()) {
-
-                            TimelineMetric timelineMetric =
-                                    new TimelineMetric(metric.getMetricName(), metric.getAppId(), metric.getHostName(), metric.getMetricValues());
-
-                            for (MetricAnomalyModel model : anomalyDetectionModels) {
-                                List<MetricAnomaly> anomalies = model.test(timelineMetric);
-                                anomalyMetricPublisher.publish(anomalies);
-                                for (MetricAnomaly anomaly : anomalies) {
-                                    LOG.info(anomaly.getAnomalyAsString());
-                                }
-
-                            }
-                        }
-                    });
-        });
-
-        jssc.start();
-        jssc.awaitTermination();
-    }
-}
-
-
-
-
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
index b25e79d..bca3366 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
@@ -25,12 +25,9 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval
   granularity <- train_data[2,1] - train_data[1,1]
   test_start <- test_data[1,1]
   test_end <- test_data[length(test_data[1,]),1]
-  cat ("\n test_start : ", as.numeric(test_start))
   train_start <- test_start - num_historic_periods*period
-  cat ("\n train_start : ", as.numeric(train_start))
   # round to start of day
   train_start <- train_start - (train_start %% interval)
-  cat ("\n train_start after rounding: ", as.numeric(train_start))
 
   time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT")
   test_data_day <- time$wday
@@ -39,7 +36,6 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval
   for ( i in length(train_data[,1]):1) {
     ts <- train_data[i,1]
     if ( ts < train_start) {
-      cat ("\n Breaking out of loop : ", ts)
       break
     }
     time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT")
@@ -49,20 +45,14 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval
     }
   }
 
-  cat ("\n Train data length : ", length(train_data[,1]))
-  cat ("\n Test data length : ", length(test_data[,1]))
-  cat ("\n Historic data length : ", length(h_data))
   if (length(h_data) < 2*length(test_data[,1])) {
     cat ("\nNot enough training data")
     return (anomalies)
   }
 
   past_median <- median(h_data)
-  cat ("\npast_median : ", past_median)
   past_sd <- sd(h_data)
-  cat ("\npast_sd : ", past_sd)
   curr_median <- median(test_data[,2])
-  cat ("\ncurr_median : ", curr_median)
 
   if (abs(curr_median - past_median) > n * past_sd) {
     anomaly <- c(test_start, test_end, curr_median, past_median, past_sd)
@@ -70,7 +60,7 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval
   }
 
   if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", " Past SD")
+    names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD")
   }
 
   return (anomalies)
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
index b4dfdcb..f22bc15 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
@@ -24,7 +24,7 @@ ams_ks <- function(train_data, test_data, p_value) {
 #  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2]
   
   anomalies <- data.frame()
-  res <- ks.test(train_data, test_data[,2])
+  res <- ks.test(train_data[,2], test_data[,2])
   
   if (res[2] < p_value) {
     anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2])
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
index 7fffbdd..f33b6ec 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
@@ -32,12 +32,17 @@ ams_tukeys <- function(train_data, test_data, n) {
     lb <- quantiles[2] - n*iqr
     ub <- quantiles[4] + n*iqr
     if ( (x < lb)  || (x > ub) ) {
-      anomaly <- c(test_data[i,1], x)
+      if (x < lb) {
+        niqr <- (quantiles[2] - x) / iqr
+      } else {
+        niqr <- (x - quantiles[4]) / iqr
+      }
+      anomaly <- c(test_data[i,1], x, niqr)
       anomalies <- rbind(anomalies, anomaly)
     }
   }
   if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS", "Value")
+    names(anomalies) <- c("TS", "Value", "niqr")
   }
   return (anomalies)
 }
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
deleted file mode 100644
index 3827006..0000000
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#url_prefix = 'http://104.196.95.78:3000/api/datasources/proxy/1/ws/v1/timeline/metrics?'
-#url_suffix = '&startTime=1459972944&endTime=1491508944&precision=MINUTES'
-#data_url <- paste(url_prefix, query, sep ="")
-#data_url <- paste(data_url, url_suffix, sep="")
-
-get_data <- function(url) {
-  library(rjson)
-  res <- fromJSON(readLines(url)[1])
-  return (res)
-}
-
-find_index <- function(data, ts) {
-  for (i in 1:length(data)) {
-    if (as.numeric(ts) == as.numeric(data[i])) {
-      return (i)
-    }
-  }
-  return (-1)
-}
\ No newline at end of file
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
new file mode 100644
index 0000000..539ca40
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.TreeMap;
+
+public class TestEmaTechnique {
+
+  @Test
+  public void testEmaInitialization() {
+
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+    Assert.assertTrue(ema.getTrackedEmas().isEmpty());
+    Assert.assertTrue(ema.getStartingWeight() == 0.5);
+    Assert.assertTrue(ema.getStartTimesSdev() == 3);
+  }
+
+  @Test
+  public void testEma() {
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+
+    long now = System.currentTimeMillis();
+
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("M1");
+    metric1.setHostName("H1");
+    metric1.setStartTime(now - 1000);
+    metric1.setAppId("A1");
+    metric1.setInstanceId(null);
+    metric1.setType("Integer");
+
+    //Train
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    for (int i = 0; i < 50; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 100, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    List<MetricAnomaly> anomalyList = ema.test(metric1);
+//    Assert.assertTrue(anomalyList.isEmpty());
+
+    metricValues = new TreeMap<Long, Double>();
+    for (int i = 0; i < 50; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 100, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    anomalyList = ema.test(metric1);
+    Assert.assertTrue(!anomalyList.isEmpty());
+    int l1 = anomalyList.size();
+
+    Assert.assertTrue(ema.updateModel(metric1, false, 20));
+    anomalyList = ema.test(metric1);
+    int l2 = anomalyList.size();
+    Assert.assertTrue(l2 < l1);
+
+    Assert.assertTrue(ema.updateModel(metric1, true, 50));
+    anomalyList = ema.test(metric1);
+    int l3 = anomalyList.size();
+    Assert.assertTrue(l3 > l2 && l3 > l1);
+
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
new file mode 100644
index 0000000..9a102a0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.UniformMetricSeries;
+import org.apache.commons.lang.ArrayUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class TestRFunctionInvoker {
+
+  private static String metricName = "TestMetric";
+  private static double[] ts;
+  private static String fullFilePath;
+
+  @BeforeClass
+  public static void init() throws URISyntaxException {
+
+    Assume.assumeTrue(System.getenv("R_HOME") != null);
+    ts = getTS(1000);
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+  }
+
+  @Test
+  public void testTukeys() throws URISyntaxException {
+
+    double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
+    double[] train_x = getRandomData(750);
+    DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
+
+    double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
+    double[] test_x = getRandomData(250);
+    test_x[50] = 5.5; //Anomaly
+    DataSeries testData = new DataSeries(metricName, test_ts, test_x);
+    Map<String, String> configs = new HashMap();
+    configs.put("tukeys.n", "3");
+
+    ResultSet rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+    Assert.assertEquals(rs.resultset.size(), 2);
+    Assert.assertEquals(rs.resultset.get(1)[0], 5.5, 0.1);
+
+  }
+
+  public static void main(String[] args) throws URISyntaxException {
+
+    String metricName = "TestMetric";
+    double[] ts = getTS(1000);
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    String fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+
+    double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
+    double[] train_x = getRandomData(750);
+    DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
+
+    double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
+    double[] test_x = getRandomData(250);
+    test_x[50] = 5.5; //Anomaly
+    DataSeries testData = new DataSeries(metricName, test_ts, test_x);
+    ResultSet rs;
+
+    Map<String, String> configs = new HashMap();
+
+    System.out.println("TUKEYS");
+    configs.put("tukeys.n", "3");
+    rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+    rs.print();
+    System.out.println("--------------");
+
+//    System.out.println("EMA Global");
+//    configs.put("ema.n", "3");
+//    configs.put("ema.w", "0.8");
+//    rs = RFunctionInvoker.ema_global(trainData, testData, configs);
+//    rs.print();
+//    System.out.println("--------------");
+//
+//    System.out.println("EMA Daily");
+//    rs = RFunctionInvoker.ema_daily(trainData, testData, configs);
+//    rs.print();
+//    System.out.println("--------------");
+//
+//    configs.put("ks.p_value", "0.00005");
+//    System.out.println("KS Test");
+//    rs = RFunctionInvoker.ksTest(trainData, testData, configs);
+//    rs.print();
+//    System.out.println("--------------");
+//
+    ts = getTS(5000);
+    train_ts = ArrayUtils.subarray(ts, 0, 4800);
+    train_x = getRandomData(4800);
+    trainData = new DataSeries(metricName, train_ts, train_x);
+    test_ts = ArrayUtils.subarray(ts, 4800, 5000);
+    test_x = getRandomData(200);
+    for (int i = 0; i < 200; i++) {
+      test_x[i] = test_x[i] * 5;
+    }
+    testData = new DataSeries(metricName, test_ts, test_x);
+    configs.put("hsdev.n", "3");
+    configs.put("hsdev.nhp", "3");
+    configs.put("hsdev.interval", "86400000");
+    configs.put("hsdev.period", "604800000");
+    System.out.println("HSdev");
+    rs = RFunctionInvoker.hsdev(trainData, testData, configs);
+    rs.print();
+    System.out.println("--------------");
+
+  }
+
+  static double[] getTS(int n) {
+    long currentTime = System.currentTimeMillis();
+    double[] ts = new double[n];
+    currentTime = currentTime - (currentTime % (5 * 60 * 1000));
+
+    for (int i = 0, j = n - 1; i < n; i++, j--) {
+      ts[j] = currentTime;
+      currentTime = currentTime - (5 * 60 * 1000);
+    }
+    return ts;
+  }
+
+  static double[] getRandomData(int n) {
+
+    UniformMetricSeries metricSeries =  new UniformMetricSeries(10, 0.1,0.05, 0.6, 0.8, true);
+    return metricSeries.getSeries(n);
+
+//    double[] metrics = new double[n];
+//    Random random = new Random();
+//    for (int i = 0; i < n; i++) {
+//      metrics[i] = random.nextDouble();
+//    }
+//    return metrics;
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
new file mode 100644
index 0000000..bb409cf
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.TreeMap;
+
+public class TestTukeys {
+
+  @BeforeClass
+  public static void init() throws URISyntaxException {
+    Assume.assumeTrue(System.getenv("R_HOME") != null);
+  }
+
+  @Test
+  public void testPointInTimeDetectionSystem() throws UnknownHostException, URISyntaxException {
+
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    String fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+
+    MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface("avijayan-ams-1.openstacklocal","http", "6188");
+
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+    long now = System.currentTimeMillis();
+
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("mm9");
+    metric1.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+    metric1.setStartTime(now);
+    metric1.setAppId("aa9");
+    metric1.setInstanceId(null);
+    metric1.setType("Integer");
+
+    //Train
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+    //2hr data.
+    for (int i = 0; i < 120; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 60 * 1000, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    timelineMetrics.addOrMergeTimelineMetric(metric1);
+
+    metricsCollectorInterface.emitMetrics(timelineMetrics);
+
+    List<MetricAnomaly> anomalyList = ema.test(metric1);
+    metricsCollectorInterface.publish(anomalyList);
+//
+//    PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(ema, metricsCollectorInterface, 3, 5*60*1000, 15*60*1000);
+//    pointInTimeADSystem.runOnce();
+//
+//    List<MetricAnomaly> anomalyList2 = ema.test(metric1);
+//
+//    pointInTimeADSystem.runOnce();
+//    List<MetricAnomaly> anomalyList3 = ema.test(metric1);
+//
+//    pointInTimeADSystem.runOnce();
+//    List<MetricAnomaly> anomalyList4 = ema.test(metric1);
+//
+//    pointInTimeADSystem.runOnce();
+//    List<MetricAnomaly> anomalyList5 = ema.test(metric1);
+//
+//    pointInTimeADSystem.runOnce();
+//    List<MetricAnomaly> anomalyList6 = ema.test(metric1);
+//
+//    Assert.assertTrue(anomalyList6.size() < anomalyList.size());
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
new file mode 100644
index 0000000..575ea8b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricSeriesGeneratorTest {
+
+  @Test
+  public void testUniformSeries() {
+
+    UniformMetricSeries metricSeries = new UniformMetricSeries(5, 0.2, 0, 0, 0, true);
+    Assert.assertTrue(metricSeries.nextValue() <= 6 && metricSeries.nextValue() >= 4);
+
+    double[] uniformSeries = MetricSeriesGeneratorFactory.createUniformSeries(50, 10, 0.2, 0.1, 0.4, 0.5, true);
+    Assert.assertTrue(uniformSeries.length == 50);
+
+    for (int i = 0; i < uniformSeries.length; i++) {
+      double value = uniformSeries[i];
+
+      if (value > 10 * 1.2) {
+        Assert.assertTrue(value >= 10 * 1.4 && value <= 10 * 1.6);
+      } else {
+        Assert.assertTrue(value >= 10 * 0.8 && value <= 10 * 1.2);
+      }
+    }
+  }
+
+  @Test
+  public void testNormalSeries() {
+    NormalMetricSeries metricSeries = new NormalMetricSeries(0, 1, 0, 0, 0, true);
+    Assert.assertTrue(metricSeries.nextValue() <= 3 && metricSeries.nextValue() >= -3);
+  }
+
+  @Test
+  public void testMonotonicSeries() {
+
+    MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(0, 0.5, 0, 0, 0, 0, true);
+    Assert.assertTrue(metricSeries.nextValue() == 0);
+    Assert.assertTrue(metricSeries.nextValue() == 0.5);
+
+    double[] incSeries = MetricSeriesGeneratorFactory.createMonotonicSeries(20, 0, 0.5, 0, 0, 0, 0, true);
+    Assert.assertTrue(incSeries.length == 20);
+    for (int i = 0; i < incSeries.length; i++) {
+      Assert.assertTrue(incSeries[i] == i * 0.5);
+    }
+  }
+
+  @Test
+  public void testDualBandSeries() {
+    double[] dualBandSeries = MetricSeriesGeneratorFactory.getDualBandSeries(30, 5, 0.2, 5, 15, 0.3, 4);
+    Assert.assertTrue(dualBandSeries[0] >= 4 && dualBandSeries[0] <= 6);
+    Assert.assertTrue(dualBandSeries[4] >= 4 && dualBandSeries[4] <= 6);
+    Assert.assertTrue(dualBandSeries[5] >= 10.5 && dualBandSeries[5] <= 19.5);
+    Assert.assertTrue(dualBandSeries[8] >= 10.5 && dualBandSeries[8] <= 19.5);
+    Assert.assertTrue(dualBandSeries[9] >= 4 && dualBandSeries[9] <= 6);
+  }
+
+  @Test
+  public void testStepSeries() {
+    double[] stepSeries = MetricSeriesGeneratorFactory.getStepFunctionSeries(30, 10, 0, 0, 5, 5, 0.5, true);
+
+    Assert.assertTrue(stepSeries[0] == 10);
+    Assert.assertTrue(stepSeries[4] == 10);
+
+    Assert.assertTrue(stepSeries[5] == 10*1.5);
+    Assert.assertTrue(stepSeries[9] == 10*1.5);
+
+    Assert.assertTrue(stepSeries[10] == 10*1.5*1.5);
+    Assert.assertTrue(stepSeries[14] == 10*1.5*1.5);
+  }
+
+  @Test
+  public void testSteadySeriesWithTurbulence() {
+    double[] steadySeriesWithTurbulence = MetricSeriesGeneratorFactory.getSteadySeriesWithTurbulentPeriod(30, 5, 0, 1, 1, 5, 1);
+
+    int count = 0;
+    for (int i = 0; i < steadySeriesWithTurbulence.length; i++) {
+      if (steadySeriesWithTurbulence[i] == 10) {
+        count++;
+      }
+    }
+    Assert.assertTrue(count == 5);
+  }
+}
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 1f03fe9..3dfcf4e 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
@@ -34,11 +35,11 @@ import org.codehaus.jackson.map.annotate.JsonDeserialize;
 @XmlAccessorType(XmlAccessType.NONE)
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public class TimelineMetric implements Comparable<TimelineMetric> {
+public class TimelineMetric implements Comparable<TimelineMetric>, Serializable {
 
   private String metricName;
   private String appId;
-  private String instanceId;
+  private String instanceId = null;
   private String hostName;
   private long startTime;
   private String type;
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
index 0c5965c..a8d3da8 100644
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 @XmlAccessorType(XmlAccessType.NONE)
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public class TimelineMetrics {
+public class TimelineMetrics implements Serializable{
 
   private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
 
diff --git a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
index bff094b..e51a47f 100644
--- a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
+++ b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
@@ -21,13 +21,13 @@ import java.util
 import java.util.logging.LogManager
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.ambari.metrics.alertservice.prototype.MetricsCollectorInterface
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka._
-import org.apache.ambari.metrics.alertservice.common.{MetricAnomaly, TimelineMetrics}
-import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel
-import org.apache.ambari.metrics.alertservice.methods.ema.{EmaModel, EmaModelLoader}
-import org.apache.ambari.metrics.alertservice.spark.AnomalyMetricPublisher
+import org.apache.ambari.metrics.alertservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly}
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique}
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
 import org.apache.log4j.Logger
 import org.apache.spark.storage.StorageLevel
 
@@ -41,7 +41,7 @@ object MetricAnomalyDetector extends Logging {
   var groupId = "ambari-metrics-group"
   var topicName = "ambari-metrics-topic"
   var numThreads = 1
-  val anomalyDetectionModels: Array[MetricAnomalyModel] = Array[MetricAnomalyModel]()
+  val anomalyDetectionModels: Array[AnomalyDetectionTechnique] = Array[AnomalyDetectionTechnique]()
 
   def main(args: Array[String]): Unit = {
 
@@ -54,7 +54,7 @@ object MetricAnomalyDetector extends Logging {
     }
 
     for (method <- args(0).split(",")) {
-      if (method == "ema") anomalyDetectionModels :+ new EmaModel()
+      if (method == "ema") anomalyDetectionModels :+ new EmaTechnique(0.5, 3)
     }
 
     val appIds = util.Arrays.asList(args(1).split(","))
@@ -63,7 +63,7 @@ object MetricAnomalyDetector extends Logging {
     val collectorPort = args(3)
     val collectorProtocol = args(4)
 
-    val anomalyMetricPublisher: AnomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort)
+    val anomalyMetricPublisher: MetricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort)
 
     val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector")
 
@@ -99,10 +99,6 @@ object MetricAnomalyDetector extends Logging {
         for (timelineMetric <- timelineMetrics.getMetrics) {
           var anomalies = emaModel.test(timelineMetric)
           anomalyMetricPublisher.publish(anomalies)
-          for (anomaly <- anomalies) {
-            var an = anomaly : MetricAnomaly
-            logger.info(an.getAnomalyAsString)
-          }
         }
       })
     })
diff --git a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
index 3c8e1ed..edd6366 100644
--- a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
+++ b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
@@ -17,8 +17,8 @@
 
 package org.apache.ambari.metrics.spark
 
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric
-import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
 import org.apache.spark.mllib.stat.Statistics
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{SparkConf, SparkContext}
@@ -61,15 +61,19 @@ object SparkPhoenixReader {
       t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
     )
 
-    //val metricName = result.head().getString(0)
+    //val seriesName = result.head().getString(0)
     //val hostname = result.head().getString(1)
     //val appId = result.head().getString(2)
 
-    val timelineMetric = new TimelineMetric(metricName, appId, hostname, metricValues)
+    val timelineMetric = new TimelineMetric()
+    timelineMetric.setMetricName(metricName)
+    timelineMetric.setAppId(appId)
+    timelineMetric.setHostName(hostname)
+    timelineMetric.setMetricValues(metricValues)
 
-    var emaModel = new EmaModel()
-    emaModel.train(timelineMetric, weight, timessdev)
-    emaModel.save(sc, modelDir)
+//    var emaModel = new EmaTechnique()
+//    emaModel.train(timelineMetric, weight, timessdev)
+//    emaModel.save(sc, modelDir)
 
 //    var metricData:Seq[Double] = Seq.empty
 //    result.collect().foreach(
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index d306ad3..a8ac1da 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -348,7 +348,7 @@
     <dependency>
       <groupId>org.apache.ambari</groupId>
       <artifactId>ambari-metrics-alertservice</artifactId>
-      <version>2.0.0.0-SNAPSHOT</version>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index 110b094..95682f9 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -407,30 +407,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
   }
 
 
-  private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) {
-    org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
-
-    List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>();
-    for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
-      timelineMetricList.add(fromTimelineMetric(timelineMetric));
-    }
-    otherMetrics.setMetrics(timelineMetricList);
-    return otherMetrics;
-  }
-
-  private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) {
-
-    org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
-    otherMetric.setMetricValues(timelineMetric.getMetricValues());
-    otherMetric.setStartTime(timelineMetric.getStartTime());
-    otherMetric.setHostName(timelineMetric.getHostName());
-    otherMetric.setInstanceId(timelineMetric.getInstanceId());
-    otherMetric.setAppId(timelineMetric.getAppId());
-    otherMetric.setMetricName(timelineMetric.getMetricName());
-
-    return otherMetric;
-  }
-
   @Override
   public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
       throws SQLException, IOException {
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
index 4ec624a..b57f7e9 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
@@ -39,7 +39,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 1000, 1.0d);
     inputValues.put(now - 2000, 2.0d);
@@ -67,7 +66,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 1000, 1.0d);
     inputValues.put(now - 2000, 2.0d);
@@ -95,7 +93,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now, 0.0d);
     inputValues.put(now - 1000, 1.0d);
@@ -123,7 +120,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 1000, 1.0d);
     timelineMetric.setMetricValues(inputValues);
@@ -149,7 +145,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 1000, 1.0d);
     timelineMetric.setMetricValues(inputValues);
@@ -173,7 +168,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
 
     long seconds = 1000;
@@ -234,7 +228,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 100, 1.0d);
     inputValues.put(now - 200, 2.0d);

-- 
To stop receiving notification emails like this one, please contact
avijayan@apache.org.