You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/09/27 22:03:11 UTC

[5/6] ambari git commit: AMBARI-22077 : Create maven module and package structure for the anomaly detection engine. (Commit 2) (avijayan)

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java
new file mode 100644
index 0000000..251603b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.methods;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricAnomaly implements Serializable{
+
+  private String methodType;
+  private double anomalyScore;
+  private String metricKey;
+  private long timestamp;
+  private double metricValue;
+
+
+  public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) {
+    this.metricKey = metricKey;
+    this.timestamp = timestamp;
+    this.metricValue = metricValue;
+    this.methodType = methodType;
+    this.anomalyScore = anomalyScore;
+
+  }
+
+  public String getMethodType() {
+    return methodType;
+  }
+
+  public void setMethodType(String methodType) {
+    this.methodType = methodType;
+  }
+
+  public double getAnomalyScore() {
+    return anomalyScore;
+  }
+
+  public void setAnomalyScore(double anomalyScore) {
+    this.anomalyScore = anomalyScore;
+  }
+
+  public void setMetricKey(String metricKey) {
+    this.metricKey = metricKey;
+  }
+
+  public String getMetricKey() {
+    return metricKey;
+  }
+
+  public void setMetricName(String metricName) {
+    this.metricKey = metricName;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public double getMetricValue() {
+    return metricValue;
+  }
+
+  public void setMetricValue(double metricValue) {
+    this.metricValue = metricValue;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java
new file mode 100644
index 0000000..593028e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.methods.ema;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+import static org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold;
+
+@XmlRootElement
+public class EmaModel implements Serializable {
+
+  private String metricName;
+  private String hostname;
+  private String appId;
+  private double ema;
+  private double ems;
+  private double weight;
+  private double timessdev;
+
+  private int ctr = 0;
+
+  private static final Log LOG = LogFactory.getLog(EmaModel.class);
+
+  public EmaModel(String name, String hostname, String appId, double weight, double timessdev) {
+    this.metricName = name;
+    this.hostname = hostname;
+    this.appId = appId;
+    this.weight = weight;
+    this.timessdev = timessdev;
+    this.ema = 0.0;
+    this.ems = 0.0;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public double testAndUpdate(double metricValue) {
+
+    double anomalyScore = 0.0;
+    LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev);
+    update(metricValue);
+    if (ctr > suppressAnomaliesTheshold) {
+      anomalyScore = test(metricValue);
+      if (anomalyScore > 0.0) {
+        LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+          ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+      } else {
+        LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+          ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+      }
+    } else {
+      ctr++;
+      if (ctr > suppressAnomaliesTheshold) {
+        LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data.");
+      }
+    }
+    return anomalyScore;
+  }
+
+  public void update(double metricValue) {
+    ema = weight * ema + (1 - weight) * metricValue;
+    ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
+    LOG.debug("In update : ema = " + ema + ", ems = " + ems);
+  }
+
+  public double test(double metricValue) {
+    LOG.debug("In test : ema = " + ema + ", ems = " + ems);
+    double diff = Math.abs(ema - metricValue) - (timessdev * ems);
+    LOG.debug("diff = " + diff);
+    if (diff > 0) {
+      return Math.abs((metricValue - ema) / ems); //Z score
+    } else {
+      return 0.0;
+    }
+  }
+
+  public void updateModel(boolean increaseSensitivity, double percent) {
+    LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent);
+    double delta = percent / 100;
+    if (increaseSensitivity) {
+      delta = delta * -1;
+    }
+    this.timessdev = timessdev + delta * timessdev;
+    //this.weight = Math.min(1.0, weight + delta * weight);
+    LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight);
+  }
+
+  public double getWeight() {
+    return weight;
+  }
+
+  public void setWeight(double weight) {
+    this.weight = weight;
+  }
+
+  public double getTimessdev() {
+    return timessdev;
+  }
+
+  public void setTimessdev(double timessdev) {
+    this.timessdev = timessdev;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java
new file mode 100644
index 0000000..7623f27
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.methods.ema;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Loader;
+
+public class EmaModelLoader implements Loader<EmaTechnique> {
+    private static final Log LOG = LogFactory.getLog(EmaModelLoader.class);
+
+    @Override
+    public EmaTechnique load(SparkContext sc, String path) {
+        return new EmaTechnique(0.5,3);
+//        Gson gson = new Gson();
+//        try {
+//            String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+//            return gson.fromJson(fileString, EmaTechnique.class);
+//        } catch (IOException e) {
+//            LOG.error(e);
+//        }
+//        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java
new file mode 100644
index 0000000..7ec17d8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.methods.ema;
+
+import com.google.gson.Gson;
+import org.apache.ambari.metrics.adservice.prototype.methods.AnomalyDetectionTechnique;
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Saveable;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable {
+
+  @XmlElement(name = "trackedEmas")
+  private Map<String, EmaModel> trackedEmas;
+  private static final Log LOG = LogFactory.getLog(EmaTechnique.class);
+
+  private double startingWeight = 0.5;
+  private double startTimesSdev = 3.0;
+  private String methodType = "ema";
+  public static int suppressAnomaliesTheshold = 100;
+
+  public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) {
+    trackedEmas = new HashMap<>();
+    this.startingWeight = startingWeight;
+    this.startTimesSdev = startTimesSdev;
+    EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold;
+    LOG.info("New EmaTechnique......");
+  }
+
+  public EmaTechnique(double startingWeight, double startTimesSdev) {
+    trackedEmas = new HashMap<>();
+    this.startingWeight = startingWeight;
+    this.startTimesSdev = startTimesSdev;
+    LOG.info("New EmaTechnique......");
+  }
+
+  public List<MetricAnomaly> test(TimelineMetric metric) {
+    String metricName = metric.getMetricName();
+    String appId = metric.getAppId();
+    String hostname = metric.getHostName();
+    String key = metricName + ":" + appId + ":" + hostname;
+
+    EmaModel emaModel = trackedEmas.get(key);
+    if (emaModel == null) {
+      LOG.debug("EmaModel not present for " + key);
+      LOG.debug("Number of tracked Emas : " + trackedEmas.size());
+      emaModel  = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev);
+      trackedEmas.put(key, emaModel);
+    } else {
+      LOG.debug("EmaModel already present for " + key);
+    }
+
+    List<MetricAnomaly> anomalies = new ArrayList<>();
+
+    for (Long timestamp : metric.getMetricValues().keySet()) {
+      double metricValue = metric.getMetricValues().get(timestamp);
+      double anomalyScore = emaModel.testAndUpdate(metricValue);
+      if (anomalyScore > 0.0) {
+        LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore);
+        MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore);
+        anomalies.add(metricAnomaly);
+      } else {
+        LOG.debug("Discarding non-anomaly for : " + key);
+      }
+    }
+    return anomalies;
+  }
+
+  public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) {
+    String metricName = timelineMetric.getMetricName();
+    String appId = timelineMetric.getAppId();
+    String hostname = timelineMetric.getHostName();
+    String key = metricName + "_" + appId + "_" + hostname;
+
+
+    EmaModel emaModel = trackedEmas.get(key);
+
+    if (emaModel == null) {
+      LOG.warn("EMA Model for " + key + " not found");
+      return false;
+    }
+    emaModel.updateModel(increaseSensitivity, percent);
+
+    return true;
+  }
+
+  @Override
+  public void save(SparkContext sc, String path) {
+    Gson gson = new Gson();
+    try {
+      String json = gson.toJson(this);
+      try (Writer writer = new BufferedWriter(new OutputStreamWriter(
+        new FileOutputStream(path), "utf-8"))) {
+        writer.write(json);
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  @Override
+  public String formatVersion() {
+    return "1.0";
+  }
+
+  public Map<String, EmaModel> getTrackedEmas() {
+    return trackedEmas;
+  }
+
+  public double getStartingWeight() {
+    return startingWeight;
+  }
+
+  public double getStartTimesSdev() {
+    return startTimesSdev;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java
new file mode 100644
index 0000000..6facc99
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.methods.hsdev;
+
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.median;
+import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.sdev;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HsdevTechnique implements Serializable {
+
+  private Map<String, Double> hsdevMap;
+  private String methodType = "hsdev";
+  private static final Log LOG = LogFactory.getLog(HsdevTechnique.class);
+
+  public HsdevTechnique() {
+    hsdevMap = new HashMap<>();
+  }
+
+  public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) {
+    int testLength = testData.values.length;
+    int trainLength = trainData.values.length;
+
+    if (trainLength < testLength) {
+      LOG.info("Not enough train data.");
+      return null;
+    }
+
+    if (!hsdevMap.containsKey(key)) {
+      hsdevMap.put(key, 3.0);
+    }
+
+    double n = hsdevMap.get(key);
+
+    double historicSd = sdev(trainData.values, false);
+    double historicMedian = median(trainData.values);
+    double currentMedian = median(testData.values);
+
+
+    if (historicSd > 0) {
+      double diff = Math.abs(currentMedian - historicMedian);
+      LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
+      LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
+
+      if (diff > n * historicSd) {
+        double zScore = diff / historicSd;
+        LOG.info("Z Score of current series : " + zScore);
+        return new MetricAnomaly(key,
+          (long) testData.ts[testLength - 1],
+          testData.values[testLength - 1],
+          methodType,
+          zScore);
+      }
+    }
+
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java
new file mode 100644
index 0000000..4727c6f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.prototype.methods.kstest;
+
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KSTechnique implements Serializable {
+
+  private String methodType = "ks";
+  private Map<String, Double> pValueMap;
+  private static final Log LOG = LogFactory.getLog(KSTechnique.class);
+
+  public KSTechnique() {
+    pValueMap = new HashMap();
+  }
+
+  public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries testData) {
+
+    int testLength = testData.values.length;
+    int trainLength = trainData.values.length;
+
+    if (trainLength < testLength) {
+      LOG.info("Not enough train data.");
+      return null;
+    }
+
+    if (!pValueMap.containsKey(key)) {
+      pValueMap.put(key, 0.05);
+    }
+    double pValue = pValueMap.get(key);
+
+    ResultSet result = RFunctionInvoker.ksTest(trainData, testData, Collections.singletonMap("ks.p_value", String.valueOf(pValue)));
+    if (result == null) {
+      LOG.error("Resultset is null when invoking KS R function...");
+      return null;
+    }
+
+    if (result.resultset.size() > 0) {
+
+      LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length);
+      LOG.info("p_value = " + result.resultset.get(3)[0]);
+      double dValue = result.resultset.get(2)[0];
+
+      return new MetricAnomaly(key,
+        (long) testData.ts[testLength - 1],
+        testData.values[testLength - 1],
+        methodType,
+        dValue);
+    }
+
+    return null;
+  }
+
+  public void updateModel(String metricKey, boolean increaseSensitivity, double percent) {
+
+    LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent);
+
+    if (!pValueMap.containsKey(metricKey)) {
+      LOG.error("Unknown metric key : " + metricKey);
+      LOG.info("pValueMap :" + pValueMap.toString());
+      return;
+    }
+
+    double delta = percent / 100;
+    if (!increaseSensitivity) {
+      delta = delta * -1;
+    }
+
+    double pValue = pValueMap.get(metricKey);
+    double newPValue = Math.min(1.0, pValue + delta * pValue);
+    pValueMap.put(metricKey, newPValue);
+    LOG.info("New pValue = " + newPValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
new file mode 100644
index 0000000..9a002a1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.prototype.testing.utilities;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class MetricAnomalyDetectorTestInput {
+
+  public MetricAnomalyDetectorTestInput() {
+  }
+
+  //Train data
+  private String trainDataName;
+  private String trainDataType;
+  private Map<String, String> trainDataConfigs;
+  private int trainDataSize;
+
+  //Test data
+  private String testDataName;
+  private String testDataType;
+  private Map<String, String> testDataConfigs;
+  private int testDataSize;
+
+  //Algorithm data
+  private List<String> methods;
+  private Map<String, String> methodConfigs;
+
+  public String getTrainDataName() {
+    return trainDataName;
+  }
+
+  public void setTrainDataName(String trainDataName) {
+    this.trainDataName = trainDataName;
+  }
+
+  public String getTrainDataType() {
+    return trainDataType;
+  }
+
+  public void setTrainDataType(String trainDataType) {
+    this.trainDataType = trainDataType;
+  }
+
+  public Map<String, String> getTrainDataConfigs() {
+    return trainDataConfigs;
+  }
+
+  public void setTrainDataConfigs(Map<String, String> trainDataConfigs) {
+    this.trainDataConfigs = trainDataConfigs;
+  }
+
+  public String getTestDataName() {
+    return testDataName;
+  }
+
+  public void setTestDataName(String testDataName) {
+    this.testDataName = testDataName;
+  }
+
+  public String getTestDataType() {
+    return testDataType;
+  }
+
+  public void setTestDataType(String testDataType) {
+    this.testDataType = testDataType;
+  }
+
+  public Map<String, String> getTestDataConfigs() {
+    return testDataConfigs;
+  }
+
+  public void setTestDataConfigs(Map<String, String> testDataConfigs) {
+    this.testDataConfigs = testDataConfigs;
+  }
+
+  public Map<String, String> getMethodConfigs() {
+    return methodConfigs;
+  }
+
+  public void setMethodConfigs(Map<String, String> methodConfigs) {
+    this.methodConfigs = methodConfigs;
+  }
+
+  public int getTrainDataSize() {
+    return trainDataSize;
+  }
+
+  public void setTrainDataSize(int trainDataSize) {
+    this.trainDataSize = trainDataSize;
+  }
+
+  public int getTestDataSize() {
+    return testDataSize;
+  }
+
+  public void setTestDataSize(int testDataSize) {
+    this.testDataSize = testDataSize;
+  }
+
+  public List<String> getMethods() {
+    return methods;
+  }
+
+  public void setMethods(List<String> methods) {
+    this.methods = methods;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java
new file mode 100644
index 0000000..d079e66
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.prototype.testing.utilities;
+
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface;
+import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Class which was originally used to send test series from AMS to Spark through Kafka.
+ */
+public class MetricAnomalyTester {
+
+//  public static String appId = MetricsCollectorInterface.serviceName;
+//  static final Log LOG = LogFactory.getLog(MetricAnomalyTester.class);
+//  static Map<String, TimelineMetric> timelineMetricMap = new HashMap<>();
+//
+//  public static TimelineMetrics runTestAnomalyRequest(MetricAnomalyDetectorTestInput input) throws UnknownHostException {
+//
+//    long currentTime = System.currentTimeMillis();
+//    TimelineMetrics timelineMetrics = new TimelineMetrics();
+//    String hostname = InetAddress.getLocalHost().getHostName();
+//
+//    //Train data
+//    TimelineMetric metric1 = new TimelineMetric();
+//    if (StringUtils.isNotEmpty(input.getTrainDataName())) {
+//      metric1 = timelineMetricMap.get(input.getTrainDataName());
+//      if (metric1 == null) {
+//        metric1 = new TimelineMetric();
+//        double[] trainSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTrainDataType(), input.getTrainDataSize(), input.getTrainDataConfigs());
+//        metric1.setMetricName(input.getTrainDataName());
+//        metric1.setAppId(appId);
+//        metric1.setHostName(hostname);
+//        metric1.setStartTime(currentTime);
+//        metric1.setInstanceId(null);
+//        metric1.setMetricValues(getAsTimeSeries(currentTime, trainSeries));
+//        timelineMetricMap.put(input.getTrainDataName(), metric1);
+//      }
+//      timelineMetrics.getMetrics().add(metric1);
+//    } else {
+//      LOG.error("No train data name specified");
+//    }
+//
+//    //Test data
+//    TimelineMetric metric2 = new TimelineMetric();
+//    if (StringUtils.isNotEmpty(input.getTestDataName())) {
+//      metric2 = timelineMetricMap.get(input.getTestDataName());
+//      if (metric2 == null) {
+//        metric2 = new TimelineMetric();
+//        double[] testSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTestDataType(), input.getTestDataSize(), input.getTestDataConfigs());
+//        metric2.setMetricName(input.getTestDataName());
+//        metric2.setAppId(appId);
+//        metric2.setHostName(hostname);
+//        metric2.setStartTime(currentTime);
+//        metric2.setInstanceId(null);
+//        metric2.setMetricValues(getAsTimeSeries(currentTime, testSeries));
+//        timelineMetricMap.put(input.getTestDataName(), metric2);
+//      }
+//      timelineMetrics.getMetrics().add(metric2);
+//    } else {
+//      LOG.warn("No test data name specified");
+//    }
+//
+//    //Invoke method
+//    if (CollectionUtils.isNotEmpty(input.getMethods())) {
+//      RFunctionInvoker.setScriptsDir("/etc/ambari-metrics-collector/conf/R-scripts");
+//      for (String methodType : input.getMethods()) {
+//        ResultSet result = RFunctionInvoker.executeMethod(methodType, getAsDataSeries(metric1), getAsDataSeries(metric2), input.getMethodConfigs());
+//        TimelineMetric timelineMetric = getAsTimelineMetric(result, methodType, input, currentTime, hostname);
+//        if (timelineMetric != null) {
+//          timelineMetrics.getMetrics().add(timelineMetric);
+//        }
+//      }
+//    } else {
+//      LOG.warn("No anomaly method requested");
+//    }
+//
+//    return timelineMetrics;
+//  }
+//
+//
+//  private static TimelineMetric getAsTimelineMetric(ResultSet result, String methodType, MetricAnomalyDetectorTestInput input, long currentTime, String hostname) {
+//
+//    if (result == null) {
+//      return null;
+//    }
+//
+//    TimelineMetric timelineMetric = new TimelineMetric();
+//    if (methodType.equals("tukeys") || methodType.equals("ema")) {
+//      timelineMetric.setMetricName(input.getTrainDataName() + "_" + input.getTestDataName() + "_" + methodType + "_" + currentTime);
+//      timelineMetric.setHostName(hostname);
+//      timelineMetric.setAppId(appId);
+//      timelineMetric.setInstanceId(null);
+//      timelineMetric.setStartTime(currentTime);
+//
+//      TreeMap<Long, Double> metricValues = new TreeMap<>();
+//      if (result.resultset.size() > 0) {
+//        double[] ts = result.resultset.get(0);
+//        double[] metrics = result.resultset.get(1);
+//        for (int i = 0; i < ts.length; i++) {
+//          if (i == 0) {
+//            timelineMetric.setStartTime((long) ts[i]);
+//          }
+//          metricValues.put((long) ts[i], metrics[i]);
+//        }
+//      }
+//      timelineMetric.setMetricValues(metricValues);
+//      return timelineMetric;
+//    }
+//    return null;
+//  }
+//
+//
+//  private static TreeMap<Long, Double> getAsTimeSeries(long currentTime, double[] values) {
+//
+//    long startTime = currentTime - (values.length - 1) * 60 * 1000;
+//    TreeMap<Long, Double> metricValues = new TreeMap<>();
+//
+//    for (int i = 0; i < values.length; i++) {
+//      metricValues.put(startTime, values[i]);
+//      startTime += (60 * 1000);
+//    }
+//    return metricValues;
+//  }
+//
+//  private static DataSeries getAsDataSeries(TimelineMetric timelineMetric) {
+//
+//    TreeMap<Long, Double> metricValues = timelineMetric.getMetricValues();
+//    double[] timestamps = new double[metricValues.size()];
+//    double[] values = new double[metricValues.size()];
+//    int i = 0;
+//
+//    for (Long timestamp : metricValues.keySet()) {
+//      timestamps[i] = timestamp;
+//      values[i++] = metricValues.get(timestamp);
+//    }
+//    return new DataSeries(timelineMetric.getMetricName() + "_" + timelineMetric.getAppId() + "_" + timelineMetric.getHostName(), timestamps, values);
+//  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java
new file mode 100644
index 0000000..3b2605b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.testing.utilities;
+
+/**
+ * Class which was originally used to send test series from AMS to Spark through Kafka.
+ */
+
+public class TestMetricSeriesGenerator {
+  //implements Runnable {
+
+//  private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>();
+//  private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class);
+//  private TimelineMetricStore metricStore;
+//  private String hostname;
+//
+//  public TestMetricSeriesGenerator(TimelineMetricStore metricStore) {
+//    this.metricStore = metricStore;
+//    try {
+//      this.hostname = InetAddress.getLocalHost().getHostName();
+//    } catch (UnknownHostException e) {
+//      e.printStackTrace();
+//    }
+//  }
+//
+//  public void addSeries(TestSeriesInputRequest inputRequest) {
+//    if (!configuredSeries.containsKey(inputRequest)) {
+//      AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs());
+//      configuredSeries.put(inputRequest, metricSeries);
+//      LOG.info("Added series " + inputRequest.getSeriesName());
+//    }
+//  }
+//
+//  public void removeSeries(String seriesName) {
+//    boolean isPresent = false;
+//    TestSeriesInputRequest tbd = null;
+//    for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) {
+//      if (inputRequest.getSeriesName().equals(seriesName)) {
+//        isPresent = true;
+//        tbd = inputRequest;
+//      }
+//    }
+//    if (isPresent) {
+//      LOG.info("Removing series " + seriesName);
+//      configuredSeries.remove(tbd);
+//    } else {
+//      LOG.info("Series not found : " + seriesName);
+//    }
+//  }
+//
+//  @Override
+//  public void run() {
+//    long currentTime = System.currentTimeMillis();
+//    TimelineMetrics timelineMetrics = new TimelineMetrics();
+//
+//    for (TestSeriesInputRequest input : configuredSeries.keySet()) {
+//      AbstractMetricSeries metricSeries = configuredSeries.get(input);
+//      TimelineMetric timelineMetric = new TimelineMetric();
+//      timelineMetric.setMetricName(input.getSeriesName());
+//      timelineMetric.setAppId("anomaly-engine-test-metric");
+//      timelineMetric.setInstanceId(null);
+//      timelineMetric.setStartTime(currentTime);
+//      timelineMetric.setHostName(hostname);
+//      TreeMap<Long, Double> metricValues = new TreeMap();
+//      metricValues.put(currentTime, metricSeries.nextValue());
+//      timelineMetric.setMetricValues(metricValues);
+//      timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+//      LOG.info("Emitting metric with appId = " + timelineMetric.getAppId());
+//    }
+//    try {
+//      LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series.");
+//      metricStore.putMetrics(timelineMetrics);
+//    } catch (Exception e) {
+//      LOG.error(e);
+//    }
+//  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java
new file mode 100644
index 0000000..d7db9ca
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype.testing.utilities;
+
+import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collections;
+import java.util.Map;
+
+@XmlRootElement
+public class TestSeriesInputRequest {
+
+  private String seriesName;
+  private String seriesType;
+  private Map<String, String> configs;
+
+  public TestSeriesInputRequest() {
+  }
+
+  public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) {
+    this.seriesName = seriesName;
+    this.seriesType = seriesType;
+    this.configs = configs;
+  }
+
+  public String getSeriesName() {
+    return seriesName;
+  }
+
+  public void setSeriesName(String seriesName) {
+    this.seriesName = seriesName;
+  }
+
+  public String getSeriesType() {
+    return seriesType;
+  }
+
+  public void setSeriesType(String seriesType) {
+    this.seriesType = seriesType;
+  }
+
+  public Map<String, String> getConfigs() {
+    return configs;
+  }
+
+  public void setConfigs(Map<String, String> configs) {
+    this.configs = configs;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o;
+    return anotherInput.getSeriesName().equals(this.getSeriesName());
+  }
+
+  @Override
+  public int hashCode() {
+    return seriesName.hashCode();
+  }
+
+  public static void main(String[] args) {
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value"));
+    try {
+      System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest));
+    } catch (JsonProcessingException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R
new file mode 100644
index 0000000..0b66095
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  EMA <- w * EMA + (1 - w) * x
+# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 )
+# Alarm = abs(x - EMA) > n * EMS
+
+ema_global <- function(train_data, test_data, w, n) {
+  
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+  
+  anomalies <- data.frame()
+  ema <- 0
+  ems <- 0
+
+  #Train Step
+  for (x in train_data) {
+    ema <- w*ema + (1-w)*x
+    ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+  }
+  
+  for ( i in 1:length(test_data[,1])) {
+    x <- test_data[i,2]
+    if (abs(x - ema) > n*ems) {
+      anomaly <- c(as.numeric(test_data[i,1]), x)
+      # print (anomaly)
+      anomalies <- rbind(anomalies, anomaly)
+    }
+    ema <- w*ema + (1-w)*x
+    ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+  }
+  
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Value")
+  }
+  return (anomalies)
+}
+
+ema_daily <- function(train_data, test_data, w, n) {
+  
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+  
+  anomalies <- data.frame()
+  ema <- vector("numeric", 7)
+  ems <- vector("numeric", 7)
+  
+  #Train Step
+  for ( i in 1:length(train_data[,1])) {
+    x <- train_data[i,2]
+    time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+    index <- time$wday
+    ema[index] <- w*ema[index] + (1-w)*x
+    ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2)
+  }
+  
+  for ( i in 1:length(test_data[,1])) {
+    x <- test_data[i,2]
+    time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+    index <- time$wday
+    
+    if (abs(x - ema[index+1]) > n*ems[index+1]) {
+      anomaly <- c(as.numeric(test_data[i,1]), x)
+      # print (anomaly)
+      anomalies <- rbind(anomalies, anomaly)
+    }
+    ema[index+1] <- w*ema[index+1] + (1-w)*x
+    ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2)
+  }
+  
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Value")
+  }
+  return(anomalies)
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r
new file mode 100644
index 0000000..bca3366
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) {
+
+  #res <- get_data(url)
+  #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+  #names(data) <- c("TS", res$metrics[[1]]$metricname)
+  anomalies <- data.frame()
+
+  granularity <- train_data[2,1] - train_data[1,1]
+  test_start <- test_data[1,1]
+  test_end <- test_data[length(test_data[1,]),1]
+  train_start <- test_start - num_historic_periods*period
+  # round to start of day
+  train_start <- train_start - (train_start %% interval)
+
+  time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+  test_data_day <- time$wday
+
+  h_data <- c()
+  for ( i in length(train_data[,1]):1) {
+    ts <- train_data[i,1]
+    if ( ts < train_start) {
+      break
+    }
+    time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT")
+    if (time$wday == test_data_day) {
+      x <- train_data[i,2]
+      h_data <- c(h_data, x)
+    }
+  }
+
+  if (length(h_data) < 2*length(test_data[,1])) {
+    cat ("\nNot enough training data")
+    return (anomalies)
+  }
+
+  past_median <- median(h_data)
+  past_sd <- sd(h_data)
+  curr_median <- median(test_data[,2])
+
+  if (abs(curr_median - past_median) > n * past_sd) {
+    anomaly <- c(test_start, test_end, curr_median, past_median, past_sd)
+    anomalies <- rbind(anomalies, anomaly)
+  }
+
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD")
+  }
+
+  return (anomalies)
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R
new file mode 100644
index 0000000..8956400
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) {
+  
+  res <- get_data(url)
+  num_metrics <- length(res$metrics)
+  anomalies <- data.frame()
+  
+  metricname <- res$metrics[[1]]$metricname
+  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+  names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+  for (i in 2:num_metrics) {
+    metricname <- res$metrics[[i]]$metricname
+    df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics))
+    names(df) <- c("TS", res$metrics[[i]]$metricname)
+    data <- merge(data, df)
+  }
+  
+  algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)]
+  iForest <- IsolationTrees(algo_data)
+  test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ]
+  
+  if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest)
+  for (i in 1:length(if_res$outF)) {
+    index <- test_start+i-1
+    if (if_res$outF[i] > threshold_score) {
+      anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i])
+      anomalies <- rbind(anomalies, anomaly)
+    } 
+  }
+  
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Anomaly Score", "Path length")
+  }
+  return (anomalies)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r
new file mode 100644
index 0000000..f22bc15
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ams_ks <- function(train_data, test_data, p_value) {
+  
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2]
+  
+  anomalies <- data.frame()
+  res <- ks.test(train_data[,2], test_data[,2])
+  
+  if (res[2] < p_value) {
+    anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2])
+    anomalies <- rbind(anomalies, anomaly)
+  }
+ 
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS Start", "TS End", "D", "p-value")
+  }
+  return (anomalies)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R
new file mode 100644
index 0000000..7650356
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+tukeys_anomalies <- data.frame()
+ema_global_anomalies <- data.frame()
+ema_daily_anomalies <- data.frame()
+ks_anomalies <- data.frame()
+hsdev_anomalies <- data.frame()
+
+init <- function() {
+  tukeys_anomalies <- data.frame()
+  ema_global_anomalies <- data.frame()
+  ema_daily_anomalies <- data.frame()
+  ks_anomalies <- data.frame()
+  hsdev_anomalies <- data.frame()
+}
+
+test_methods <- function(data) {
+
+  init()
+  #res <- get_data(url)
+  #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+  #names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+  limit <- data[length(data[,1]),1]
+  step <- data[2,1] - data[1,1]
+
+  train_start <- data[1,1]
+  train_end <- get_next_day_boundary(train_start, step, limit)
+  test_start <- train_end + step
+  test_end <- get_next_day_boundary(test_start, step, limit)
+  i <- 1
+  day <- 24*60*60*1000
+
+  while (test_start < limit) {
+
+    print (i)
+    i <- i + 1
+    train_data <- data[which(data$TS >= train_start & data$TS <= train_end),]
+    test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+    #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3))
+    #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3))
+    #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3))
+    #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05))
+    hsdev_train_data <- data[which(data$TS < test_start),]
+    hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day))
+
+    train_start <- test_start
+    train_end <- get_next_day_boundary(train_start, step, limit)
+    test_start <- train_end + step
+    test_end <- get_next_day_boundary(test_start, step, limit)
+  }
+  return (hsdev_anomalies)
+}
+
+get_next_day_boundary <- function(start, step, limit) {
+
+  if (start > limit) {
+    return (-1)
+  }
+
+  while (start <= limit) {
+    if (((start %% (24*60*60*1000)) - 28800000) == 0) {
+      return (start)
+    }
+    start <- start + step
+  }
+  return (start)
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r
new file mode 100644
index 0000000..0312226
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+ams_tukeys <- function(train_data, test_data, n) {
+
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+  anomalies <- data.frame()
+  quantiles <- quantile(train_data[,2])
+  iqr <- quantiles[4] - quantiles[2]
+  niqr <- 0
+
+  for ( i in 1:length(test_data[,1])) {
+    x <- test_data[i,2]
+    lb <- quantiles[2] - n*iqr
+    ub <- quantiles[4] + n*iqr
+    if ( (x < lb)  || (x > ub) ) {
+      if (iqr != 0) {
+        if (x < lb) {
+          niqr <- (quantiles[2] - x) / iqr
+        } else {
+          niqr <- (x - quantiles[4]) / iqr
+        }
+      }
+        anomaly <- c(test_data[i,1], x, niqr)
+        anomalies <- rbind(anomalies, anomaly)
+      }
+  }
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Value", "niqr")
+  }
+  return (anomalies)
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/input-config.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/input-config.properties b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/input-config.properties
new file mode 100644
index 0000000..ab106c4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/input-config.properties
@@ -0,0 +1,42 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+appIds=HOST
+
+collectorHost=localhost
+collectorPort=6188
+collectorProtocol=http
+
+zkQuorum=localhost:2181
+
+ambariServerHost=localhost
+clusterName=c1
+
+emaW=0.8
+emaN=3
+tukeysN=3
+pointInTimeTestInterval=300000
+pointInTimeTrainInterval=900000
+
+ksTestInterval=600000
+ksTrainInterval=600000
+hsdevNhp=3
+hsdevInterval=1800000;
+
+skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime
+hosts=avijayan-ad-1.openstacklocal
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala
new file mode 100644
index 0000000..6122f5e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.spark.prototype
+
+import java.io.{FileInputStream, IOException, InputStream}
+import java.util
+import java.util.Properties
+import java.util.logging.LogManager
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.ambari.metrics.adservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly}
+import org.apache.ambari.metrics.adservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique}
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+import org.apache.log4j.Logger
+import org.apache.spark.storage.StorageLevel
+
+object MetricAnomalyDetector {
+
+  /*
+    Load current EMA model
+    Filter step - Check if anomaly
+    Collect / Write to AMS / Print.
+   */
+
+//  var brokers = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181"
+//  var groupId = "ambari-metrics-group"
+//  var topicName = "ambari-metrics-topic"
+//  var numThreads = 1
+//  val anomalyDetectionModels: Array[AnomalyDetectionTechnique] = Array[AnomalyDetectionTechnique]()
+//
+//  def readProperties(propertiesFile: String): Properties = try {
+//    val properties = new Properties
+//    var inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile)
+//    if (inputStream == null) inputStream = new FileInputStream(propertiesFile)
+//    properties.load(inputStream)
+//    properties
+//  } catch {
+//    case ioEx: IOException =>
+//      null
+//  }
+//
+//  def main(args: Array[String]): Unit = {
+//
+//    @transient
+//    lazy val log = org.apache.log4j.LogManager.getLogger("MetricAnomalyDetectorLogger")
+//
+//    if (args.length < 1) {
+//      System.err.println("Usage: MetricSparkConsumer <input-config-file>")
+//      System.exit(1)
+//    }
+//
+//    //Read properties
+//    val properties = readProperties(propertiesFile = args(0))
+//
+//    //Load EMA parameters - w, n
+//    val emaW = properties.getProperty("emaW").toDouble
+//    val emaN = properties.getProperty("emaN").toDouble
+//
+//    //collector info
+//    val collectorHost: String = properties.getProperty("collectorHost")
+//    val collectorPort: String = properties.getProperty("collectorPort")
+//    val collectorProtocol: String = properties.getProperty("collectorProtocol")
+//    val anomalyMetricPublisher = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort)
+//
+//    //Instantiate Kafka stream reader
+//    val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector")
+//    val streamingContext = new StreamingContext(sparkConf, Duration(10000))
+//
+//    val topicsSet = topicName.toSet
+//    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
+////    val stream = KafkaUtils.createDirectStream()
+//
+//    val kafkaStream = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, Map(topicName -> numThreads), StorageLevel.MEMORY_AND_DISK_SER_2)
+//    kafkaStream.print()
+//
+//    var timelineMetricsStream = kafkaStream.map( message => {
+//      val mapper = new ObjectMapper
+//      val metrics = mapper.readValue(message._2, classOf[TimelineMetrics])
+//      metrics
+//    })
+//    timelineMetricsStream.print()
+//
+//    var appMetricStream = timelineMetricsStream.map( timelineMetrics => {
+//      (timelineMetrics.getMetrics.get(0).getAppId, timelineMetrics)
+//    })
+//    appMetricStream.print()
+//
+//    var filteredAppMetricStream = appMetricStream.filter( appMetricTuple => {
+//      appIds.contains(appMetricTuple._1)
+//    } )
+//    filteredAppMetricStream.print()
+//
+//    filteredAppMetricStream.foreachRDD( rdd => {
+//      rdd.foreach( appMetricTuple => {
+//        val timelineMetrics = appMetricTuple._2
+//        logger.info("Received Metric (1): " + timelineMetrics.getMetrics.get(0).getMetricName)
+//        log.info("Received Metric (2): " + timelineMetrics.getMetrics.get(0).getMetricName)
+//        for (timelineMetric <- timelineMetrics.getMetrics) {
+//          var anomalies = emaModel.test(timelineMetric)
+//          anomalyMetricPublisher.publish(anomalies)
+//        }
+//      })
+//    })
+//
+//    streamingContext.start()
+//    streamingContext.awaitTermination()
+//  }
+  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
new file mode 100644
index 0000000..6e1ae07
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.spark.prototype
+
+import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+object SparkPhoenixReader {
+
+  def main(args: Array[String]) {
+
+    if (args.length < 6) {
+      System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>")
+      System.exit(1)
+    }
+
+    var metricName = args(0)
+    var appId = args(1)
+    var hostname = args(2)
+    var weight = args(3).toDouble
+    var timessdev = args(4).toInt
+    var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure
+    var modelDir = args(6)
+
+    val conf = new SparkConf()
+    conf.set("spark.app.name", "AMSAnomalyModelBuilder")
+    //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077")
+
+    var sc = new SparkContext(conf)
+    val sqlContext = new SQLContext(sc)
+
+    val currentTime = System.currentTimeMillis()
+    val oneDayBack = currentTime - 24*60*60*1000
+
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString))
+    df.registerTempTable("METRIC_RECORD")
+    val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " +
+      "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack)
+
+    var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double]
+    result.collect().foreach(
+      t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
+    )
+
+    //val seriesName = result.head().getString(0)
+    //val hostname = result.head().getString(1)
+    //val appId = result.head().getString(2)
+
+    val timelineMetric = new TimelineMetric()
+    timelineMetric.setMetricName(metricName)
+    timelineMetric.setAppId(appId)
+    timelineMetric.setHostName(hostname)
+    timelineMetric.setMetricValues(metricValues)
+
+    var emaModel = new EmaTechnique(weight, timessdev)
+    emaModel.test(timelineMetric)
+    emaModel.save(sc, modelDir)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestEmaTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestEmaTechnique.java
new file mode 100644
index 0000000..76a00a6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestEmaTechnique.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype;
+
+import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker;
+import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.TreeMap;
+
+import static org.apache.ambari.metrics.adservice.prototype.TestRFunctionInvoker.getTS;
+
+public class TestEmaTechnique {
+
+  private static double[] ts;
+  private static String fullFilePath;
+
+  @BeforeClass
+  public static void init() throws URISyntaxException {
+
+    Assume.assumeTrue(System.getenv("R_HOME") != null);
+    ts = getTS(1000);
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+  }
+
+  @Test
+  public void testEmaInitialization() {
+
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+    Assert.assertTrue(ema.getTrackedEmas().isEmpty());
+    Assert.assertTrue(ema.getStartingWeight() == 0.5);
+    Assert.assertTrue(ema.getStartTimesSdev() == 2);
+  }
+
+  @Test
+  public void testEma() {
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+
+    long now = System.currentTimeMillis();
+
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("M1");
+    metric1.setHostName("H1");
+    metric1.setStartTime(now - 1000);
+    metric1.setAppId("A1");
+    metric1.setInstanceId(null);
+    metric1.setType("Integer");
+
+    //Train
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    for (int i = 0; i < 50; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 100, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    List<MetricAnomaly> anomalyList = ema.test(metric1);
+//    Assert.assertTrue(anomalyList.isEmpty());
+
+    metricValues = new TreeMap<Long, Double>();
+    for (int i = 0; i < 50; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 100, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    anomalyList = ema.test(metric1);
+    Assert.assertTrue(!anomalyList.isEmpty());
+    int l1 = anomalyList.size();
+
+    Assert.assertTrue(ema.updateModel(metric1, false, 20));
+    anomalyList = ema.test(metric1);
+    int l2 = anomalyList.size();
+    Assert.assertTrue(l2 < l1);
+
+    Assert.assertTrue(ema.updateModel(metric1, true, 50));
+    anomalyList = ema.test(metric1);
+    int l3 = anomalyList.size();
+    Assert.assertTrue(l3 > l2 && l3 > l1);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestRFunctionInvoker.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestRFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestRFunctionInvoker.java
new file mode 100644
index 0000000..98fa050
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestRFunctionInvoker.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.prototype;
+
+import org.apache.ambari.metrics.adservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.adservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker;
+import org.apache.ambari.metrics.adservice.seriesgenerator.UniformMetricSeries;
+import org.apache.commons.lang.ArrayUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestRFunctionInvoker {
+
+  private static String metricName = "TestMetric";
+  private static double[] ts;
+  private static String fullFilePath;
+
+  @BeforeClass
+  public static void init() throws URISyntaxException {
+
+    Assume.assumeTrue(System.getenv("R_HOME") != null);
+    ts = getTS(1000);
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+  }
+
+  @Test
+  public void testTukeys() throws URISyntaxException {
+
+    double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
+    double[] train_x = getRandomData(750);
+    DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
+
+    double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
+    double[] test_x = getRandomData(250);
+    test_x[50] = 5.5; //Anomaly
+    DataSeries testData = new DataSeries(metricName, test_ts, test_x);
+    Map<String, String> configs = new HashMap();
+    configs.put("tukeys.n", "3");
+
+    ResultSet rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+    Assert.assertEquals(rs.resultset.size(), 2);
+    Assert.assertEquals(rs.resultset.get(1)[0], 5.5, 0.1);
+
+  }
+
+  public static void main(String[] args) throws URISyntaxException {
+
+    String metricName = "TestMetric";
+    double[] ts = getTS(1000);
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    String fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+
+    double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
+    double[] train_x = getRandomData(750);
+    DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
+
+    double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
+    double[] test_x = getRandomData(250);
+    test_x[50] = 5.5; //Anomaly
+    DataSeries testData = new DataSeries(metricName, test_ts, test_x);
+    ResultSet rs;
+
+    Map<String, String> configs = new HashMap();
+
+    System.out.println("TUKEYS");
+    configs.put("tukeys.n", "3");
+    rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+    rs.print();
+    System.out.println("--------------");
+
+//    System.out.println("EMA Global");
+//    configs.put("ema.n", "3");
+//    configs.put("ema.w", "0.8");
+//    rs = RFunctionInvoker.ema_global(trainData, testData, configs);
+//    rs.print();
+//    System.out.println("--------------");
+//
+//    System.out.println("EMA Daily");
+//    rs = RFunctionInvoker.ema_daily(trainData, testData, configs);
+//    rs.print();
+//    System.out.println("--------------");
+//
+//    configs.put("ks.p_value", "0.00005");
+//    System.out.println("KS Test");
+//    rs = RFunctionInvoker.ksTest(trainData, testData, configs);
+//    rs.print();
+//    System.out.println("--------------");
+//
+    ts = getTS(5000);
+    train_ts = ArrayUtils.subarray(ts, 0, 4800);
+    train_x = getRandomData(4800);
+    trainData = new DataSeries(metricName, train_ts, train_x);
+    test_ts = ArrayUtils.subarray(ts, 4800, 5000);
+    test_x = getRandomData(200);
+    for (int i = 0; i < 200; i++) {
+      test_x[i] = test_x[i] * 5;
+    }
+    testData = new DataSeries(metricName, test_ts, test_x);
+    configs.put("hsdev.n", "3");
+    configs.put("hsdev.nhp", "3");
+    configs.put("hsdev.interval", "86400000");
+    configs.put("hsdev.period", "604800000");
+    System.out.println("HSdev");
+    rs = RFunctionInvoker.hsdev(trainData, testData, configs);
+    rs.print();
+    System.out.println("--------------");
+
+  }
+
+  static double[] getTS(int n) {
+    long currentTime = System.currentTimeMillis();
+    double[] ts = new double[n];
+    currentTime = currentTime - (currentTime % (5 * 60 * 1000));
+
+    for (int i = 0, j = n - 1; i < n; i++, j--) {
+      ts[j] = currentTime;
+      currentTime = currentTime - (5 * 60 * 1000);
+    }
+    return ts;
+  }
+
+  static double[] getRandomData(int n) {
+
+    UniformMetricSeries metricSeries =  new UniformMetricSeries(10, 0.1,0.05, 0.6, 0.8, true);
+    return metricSeries.getSeries(n);
+
+//    double[] metrics = new double[n];
+//    Random random = new Random();
+//    for (int i = 0; i < n; i++) {
+//      metrics[i] = random.nextDouble();
+//    }
+//    return metrics;
+  }
+}