You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/09/27 21:04:53 UTC

[3/6] ambari git commit: AMBARI-22077 : Create maven module and package structure for the anomaly detection engine. (avijayan)

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java
new file mode 100644
index 0000000..246565d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.core;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+public class MetricsCollectorInterface implements Serializable {
+
+  private static String hostName = null;
+  private String instanceId = null;
+  public final static String serviceName = "anomaly-engine";
+  private String collectorHost;
+  private String protocol;
+  private String port;
+  private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
+  private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class);
+  private static ObjectMapper mapper;
+  private final static ObjectReader timelineObjectReader;
+
+  static {
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.getSerializationConfig()
+      .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+    timelineObjectReader = mapper.reader(TimelineMetrics.class);
+  }
+
+  public MetricsCollectorInterface(String collectorHost, String protocol, String port) {
+    this.collectorHost = collectorHost;
+    this.protocol = protocol;
+    this.port = port;
+    this.hostName = getDefaultLocalHostName();
+  }
+
+  public static String getDefaultLocalHostName() {
+
+    if (hostName != null) {
+      return hostName;
+    }
+
+    try {
+      return InetAddress.getLocalHost().getCanonicalHostName();
+    } catch (UnknownHostException e) {
+      LOG.info("Error getting host address");
+    }
+    return null;
+  }
+
+  public void publish(List<MetricAnomaly> metricAnomalies) {
+    if (CollectionUtils.isNotEmpty(metricAnomalies)) {
+      LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
+      List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
+      if (!metricList.isEmpty()) {
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.setMetrics(metricList);
+        emitMetrics(timelineMetrics);
+      }
+    } else {
+      LOG.debug("No anomalies to send.");
+    }
+  }
+
+  private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
+    List<TimelineMetric> metrics = new ArrayList<>();
+
+    if (metricAnomalies.isEmpty()) {
+      return metrics;
+    }
+
+    for (MetricAnomaly anomaly : metricAnomalies) {
+      TimelineMetric timelineMetric = new TimelineMetric();
+      timelineMetric.setMetricName(anomaly.getMetricKey());
+      timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType());
+      timelineMetric.setInstanceId(null);
+      timelineMetric.setHostName(getDefaultLocalHostName());
+      timelineMetric.setStartTime(anomaly.getTimestamp());
+      HashMap<String, String> metadata = new HashMap<>();
+      metadata.put("method", anomaly.getMethodType());
+      metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore()));
+      timelineMetric.setMetadata(metadata);
+      TreeMap<Long,Double> metricValues = new TreeMap<>();
+      metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue());
+      timelineMetric.setMetricValues(metricValues);
+
+      metrics.add(timelineMetric);
+    }
+    return metrics;
+  }
+
+  public boolean emitMetrics(TimelineMetrics metrics) {
+    String connectUrl = constructTimelineMetricUri();
+    String jsonData = null;
+    LOG.debug("EmitMetrics connectUrl = " + connectUrl);
+    try {
+      jsonData = mapper.writeValueAsString(metrics);
+      LOG.info(jsonData);
+    } catch (IOException e) {
+      LOG.error("Unable to parse metrics", e);
+    }
+    if (jsonData != null) {
+      return emitMetricsJson(connectUrl, jsonData);
+    }
+    return false;
+  }
+
+  private HttpURLConnection getConnection(String spec) throws IOException {
+    return (HttpURLConnection) new URL(spec).openConnection();
+  }
+
+  private boolean emitMetricsJson(String connectUrl, String jsonData) {
+    int timeout = 10000;
+    HttpURLConnection connection = null;
+    try {
+      if (connectUrl == null) {
+        throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+      }
+      connection = getConnection(connectUrl);
+
+      connection.setRequestMethod("POST");
+      connection.setRequestProperty("Content-Type", "application/json");
+      connection.setRequestProperty("Connection", "Keep-Alive");
+      connection.setConnectTimeout(timeout);
+      connection.setReadTimeout(timeout);
+      connection.setDoOutput(true);
+
+      if (jsonData != null) {
+        try (OutputStream os = connection.getOutputStream()) {
+          os.write(jsonData.getBytes("UTF-8"));
+        }
+      }
+
+      int statusCode = connection.getResponseCode();
+
+      if (statusCode != 200) {
+        LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
+          "statusCode = " + statusCode);
+      } else {
+        LOG.info("Metrics posted to Collector " + connectUrl);
+      }
+      return true;
+    } catch (IOException ioe) {
+      LOG.error(ioe.getMessage());
+    }
+    return false;
+  }
+
+  private String constructTimelineMetricUri() {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(collectorHost);
+    sb.append(":");
+    sb.append(port);
+    sb.append(WS_V1_TIMELINE_METRICS);
+    return sb.toString();
+  }
+
+  public TimelineMetrics fetchMetrics(String metricName,
+                                      String appId,
+                                      String hostname,
+                                      long startime,
+                                      long endtime) {
+
+    String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
+      "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
+    LOG.debug("Fetch metrics URL : " + url);
+
+    URL obj = null;
+    BufferedReader in = null;
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+    try {
+      obj = new URL(url);
+      HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+      con.setRequestMethod("GET");
+      int responseCode = con.getResponseCode();
+      LOG.debug("Sending 'GET' request to URL : " + url);
+      LOG.debug("Response Code : " + responseCode);
+
+      in = new BufferedReader(
+        new InputStreamReader(con.getInputStream()));
+      timelineMetrics = timelineObjectReader.readValue(in);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      if (in != null) {
+        try {
+          in.close();
+        } catch (IOException e) {
+          LOG.warn(e);
+        }
+      }
+    }
+
+    LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics.");
+    return timelineMetrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java
new file mode 100644
index 0000000..c579515
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.core;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaModel;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class PointInTimeADSystem implements Serializable {
+
+  //private EmaTechnique emaTechnique;
+  private MetricsCollectorInterface metricsCollectorInterface;
+  private Map<String, Double> tukeysNMap;
+  private double defaultTukeysN = 3;
+
+  private long testIntervalMillis = 5*60*1000; //10mins
+  private long trainIntervalMillis = 15*60*1000; //1hour
+
+  private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class);
+
+  private AmbariServerInterface ambariServerInterface;
+  private int sensitivity = 50;
+  private int minSensitivity = 0;
+  private int maxSensitivity = 100;
+
+  public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
+                             long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
+    this.metricsCollectorInterface = metricsCollectorInterface;
+    this.defaultTukeysN = defaultTukeysN;
+    this.tukeysNMap = new HashMap<>();
+    this.testIntervalMillis = testIntervalMillis;
+    this.trainIntervalMillis = trainIntervalMillis;
+    this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName);
+    LOG.info("Starting PointInTimeADSystem...");
+  }
+
+  public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) {
+    LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime  - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]");
+
+    int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity();
+    if (requiredSensivity == -1 || requiredSensivity == sensitivity) {
+      LOG.info("No change in sensitivity needed.");
+    } else {
+      LOG.info("Current tukey's N value = " + defaultTukeysN);
+      if (requiredSensivity > sensitivity) {
+        int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
+        while (sensitivity < targetSensitivity) {
+          defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05;
+          sensitivity++;
+        }
+      } else {
+        int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
+        while (sensitivity > targetSensitivity) {
+          defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05;
+          sensitivity--;
+        }
+      }
+      LOG.info("New tukey's N value = " + defaultTukeysN);
+    }
+
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    for (String metricKey : emaTechnique.getTrackedEmas().keySet()) {
+      LOG.info("EMA key = " + metricKey);
+      EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey);
+      String metricName = emaModel.getMetricName();
+      String appId = emaModel.getAppId();
+      String hostname = emaModel.getHostname();
+
+      TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis),
+        startTime);
+
+      if (tukeysData.getMetrics().isEmpty()) {
+        LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey);
+        continue;
+      }
+
+      List<Double> trainTsList = new ArrayList<>();
+      List<Double> trainDataList = new ArrayList<>();
+      List<Double> testTsList = new ArrayList<>();
+      List<Double> testDataList = new ArrayList<>();
+
+      for (TimelineMetric metric : tukeysData.getMetrics()) {
+        for (Long timestamp : metric.getMetricValues().keySet()) {
+          if (timestamp <= (startTime - testIntervalMillis)) {
+            trainDataList.add(metric.getMetricValues().get(timestamp));
+            trainTsList.add((double)timestamp);
+          } else {
+            testDataList.add(metric.getMetricValues().get(timestamp));
+            testTsList.add((double)timestamp);
+          }
+        }
+      }
+
+      if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+        LOG.info("Not enough train/test data to perform analysis.");
+        continue;
+      }
+
+      String tukeysTrainSeries = "tukeysTrainSeries";
+      double[] trainTs = new double[trainTsList.size()];
+      double[] trainData = new double[trainTsList.size()];
+      for (int i = 0; i < trainTs.length; i++) {
+        trainTs[i] = trainTsList.get(i);
+        trainData[i] = trainDataList.get(i);
+      }
+
+      String tukeysTestSeries = "tukeysTestSeries";
+      double[] testTs = new double[testTsList.size()];
+      double[] testData = new double[testTsList.size()];
+      for (int i = 0; i < testTs.length; i++) {
+        testTs[i] = testTsList.get(i);
+        testData[i] = testDataList.get(i);
+      }
+
+      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+      DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData);
+      DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData);
+
+      if (!tukeysNMap.containsKey(metricKey)) {
+        tukeysNMap.put(metricKey, defaultTukeysN);
+      }
+
+      Map<String, String> configs = new HashMap<>();
+      configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey)));
+
+      ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs);
+
+      List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname);
+      LOG.info("Tukeys anomalies size : " + tukeysMetrics.size());
+      TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>();
+
+      for (TimelineMetric tukeysMetric : tukeysMetrics) {
+        tukeysMetricValues.putAll(tukeysMetric.getMetricValues());
+        timelineMetrics.addOrMergeTimelineMetric(tukeysMetric);
+      }
+
+      TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime);
+      TreeMap<Long, Double> emaMetricValues = new TreeMap();
+      if (!emaData.getMetrics().isEmpty()) {
+        emaMetricValues = emaData.getMetrics().get(0).getMetricValues();
+      }
+
+      LOG.info("Ema anomalies size : " + emaMetricValues.size());
+      int tp = 0;
+      int tn = 0;
+      int fp = 0;
+      int fn = 0;
+
+      for (double ts : testTs) {
+        long timestamp = (long) ts;
+        if (tukeysMetricValues.containsKey(timestamp)) {
+          if (emaMetricValues.containsKey(timestamp)) {
+            tp++;
+          } else {
+            fn++;
+          }
+        } else {
+          if (emaMetricValues.containsKey(timestamp)) {
+            fp++;
+          } else {
+            tn++;
+          }
+        }
+      }
+
+      double recall = (double) tp / (double) (tp + fn);
+      double precision = (double) tp / (double) (tp + fp);
+      LOG.info("----------------------------");
+      LOG.info("Precision Recall values for " + metricKey);
+      LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn);
+      LOG.info("----------------------------");
+
+      if (recall < 0.5) {
+        LOG.info("Increasing EMA sensitivity by 10%");
+        emaModel.updateModel(true, 5);
+      } else if (precision < 0.5) {
+        LOG.info("Decreasing EMA sensitivity by 10%");
+        emaModel.updateModel(false, 5);
+      }
+
+    }
+
+    if (emaTechnique.getTrackedEmas().isEmpty()){
+      LOG.info("No EMA Technique keys tracked!!!!");
+    }
+
+    if (!timelineMetrics.getMetrics().isEmpty()) {
+      metricsCollectorInterface.emitMetrics(timelineMetrics);
+    }
+  }
+
+  private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) {
+
+    List<TimelineMetric> timelineMetrics = new ArrayList<>();
+
+    if (result == null) {
+      LOG.info("ResultSet from R call is null!!");
+      return null;
+    }
+
+    if (result.resultset.size() > 0) {
+      double[] ts = result.resultset.get(0);
+      double[] metrics = result.resultset.get(1);
+      double[] anomalyScore = result.resultset.get(2);
+      for (int i = 0; i < ts.length; i++) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname);
+        timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+        timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
+        timelineMetric.setInstanceId(null);
+        timelineMetric.setStartTime((long) ts[i]);
+        TreeMap<Long, Double> metricValues = new TreeMap<>();
+        metricValues.put((long) ts[i], metrics[i]);
+
+        HashMap<String, String> metadata = new HashMap<>();
+        metadata.put("method", "tukeys");
+        if (String.valueOf(anomalyScore[i]).equals("infinity")) {
+          LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname);
+        } else {
+          metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+        }
+        timelineMetric.setMetadata(metadata);
+
+        timelineMetric.setMetricValues(metricValues);
+        timelineMetrics.add(timelineMetric);
+      }
+    }
+
+    return timelineMetrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java
new file mode 100644
index 0000000..4538f0b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.core;
+
+
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RFunctionInvoker {
+
+  static final Log LOG = LogFactory.getLog(RFunctionInvoker.class);
+  public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+  private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts";
+
+  private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) {
+    r.assign("train_ts", trainData.ts);
+    r.assign("train_x", trainData.values);
+    r.eval("train_data <- data.frame(train_ts,train_x)");
+    r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")");
+
+    r.assign("test_ts", testData.ts);
+    r.assign("test_x", testData.values);
+    r.eval("test_data <- data.frame(test_ts,test_x)");
+    r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")");
+  }
+
+  public static void setScriptsDir(String dir) {
+    rScriptDir = dir;
+  }
+
+  public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+
+    ResultSet result;
+    switch (methodType) {
+      case "tukeys":
+        result = tukeys(trainData, testData, configs);
+        break;
+      case "ema":
+        result = ema_global(trainData, testData, configs);
+        break;
+      case "ks":
+        result = ksTest(trainData, testData, configs);
+        break;
+      case "hsdev":
+        result = hsdev(trainData, testData, configs);
+        break;
+      default:
+        result = tukeys(trainData, testData, configs);
+        break;
+    }
+    return result;
+  }
+
+  public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+
+      REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')");
+
+      double n = Double.parseDouble(configs.get("tukeys.n"));
+      r.eval("n <- " + n);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ams_tukeys(train_data, test_data, n)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+      int n = Integer.parseInt(configs.get("ema.n"));
+      r.eval("n <- " + n);
+
+      double w = Double.parseDouble(configs.get("ema.w"));
+      r.eval("w <- " + w);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ema_global(train_data, test_data, w, n)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+      int n = Integer.parseInt(configs.get("ema.n"));
+      r.eval("n <- " + n);
+
+      double w = Double.parseDouble(configs.get("ema.w"));
+      r.eval("w <- " + w);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ema_daily(train_data, test_data, w, n)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/kstest.r" + "')");
+
+      double p_value = Double.parseDouble(configs.get("ks.p_value"));
+      r.eval("p_value <- " + p_value);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an <- ams_ks(train_data, test_data, p_value)");
+      REXP exp = r.eval("an");
+      RVector cont = (RVector) exp.getContent();
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+
+  public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+    try {
+      r.eval("source('" + rScriptDir + "/hsdev.r" + "')");
+
+      int n = Integer.parseInt(configs.get("hsdev.n"));
+      r.eval("n <- " + n);
+
+      int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
+      r.eval("nhp <- " + nhp);
+
+      long interval = Long.parseLong(configs.get("hsdev.interval"));
+      r.eval("interval <- " + interval);
+
+      long period = Long.parseLong(configs.get("hsdev.period"));
+      r.eval("period <- " + period);
+
+      loadDataSets(r, trainData, testData);
+
+      r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
+      REXP exp = r.eval("an2");
+      RVector cont = (RVector) exp.getContent();
+
+      List<double[]> result = new ArrayList();
+      for (int i = 0; i < cont.size(); i++) {
+        result.add(cont.at(i).asDoubleArray());
+      }
+      return new ResultSet(result);
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      r.end();
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java
new file mode 100644
index 0000000..2a205d1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.core;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.hsdev.HsdevTechnique;
+import org.apache.ambari.metrics.alertservice.prototype.methods.kstest.KSTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class TrendADSystem implements Serializable {
+
+  private MetricsCollectorInterface metricsCollectorInterface;
+  private List<TrendMetric> trendMetrics;
+
+  private long ksTestIntervalMillis = 10 * 60 * 1000;
+  private long ksTrainIntervalMillis = 10 * 60 * 1000;
+  private KSTechnique ksTechnique;
+
+  private HsdevTechnique hsdevTechnique;
+  private int hsdevNumHistoricalPeriods = 3;
+
+  private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>();
+  private static final Log LOG = LogFactory.getLog(TrendADSystem.class);
+  private String inputFile = "";
+
+  public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
+                       long ksTestIntervalMillis,
+                       long ksTrainIntervalMillis,
+                       int hsdevNumHistoricalPeriods) {
+
+    this.metricsCollectorInterface = metricsCollectorInterface;
+    this.ksTestIntervalMillis = ksTestIntervalMillis;
+    this.ksTrainIntervalMillis = ksTrainIntervalMillis;
+    this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods;
+
+    this.ksTechnique = new KSTechnique();
+    this.hsdevTechnique = new HsdevTechnique();
+
+    trendMetrics = new ArrayList<>();
+  }
+
+  public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) {
+    readInputFile(inputFile);
+
+    long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
+    LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " +
+      new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis)
+      + " : " + new Date(ksTestIntervalStartTime) + "]");
+
+    for (TrendMetric metric : trendMetrics) {
+      String metricName = metric.metricName;
+      String appId = metric.appId;
+      String hostname = metric.hostname;
+      String key = metricName + ":" + appId + ":" + hostname;
+
+      TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis,
+        currentEndTime);
+
+      if (ksData.getMetrics().isEmpty()) {
+        LOG.info("No metrics fetched for KS, metricKey = " + key);
+        continue;
+      }
+
+      List<Double> trainTsList = new ArrayList<>();
+      List<Double> trainDataList = new ArrayList<>();
+      List<Double> testTsList = new ArrayList<>();
+      List<Double> testDataList = new ArrayList<>();
+
+      for (TimelineMetric timelineMetric : ksData.getMetrics()) {
+        for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+          if (timestamp <= ksTestIntervalStartTime) {
+            trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            trainTsList.add((double) timestamp);
+          } else {
+            testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            testTsList.add((double) timestamp);
+          }
+        }
+      }
+
+      LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size());
+      if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+        LOG.info("Not enough train/test data to perform KS analysis.");
+        continue;
+      }
+
+      String ksTrainSeries = "KSTrainSeries";
+      double[] trainTs = new double[trainTsList.size()];
+      double[] trainData = new double[trainTsList.size()];
+      for (int i = 0; i < trainTs.length; i++) {
+        trainTs[i] = trainTsList.get(i);
+        trainData[i] = trainDataList.get(i);
+      }
+
+      String ksTestSeries = "KSTestSeries";
+      double[] testTs = new double[testTsList.size()];
+      double[] testData = new double[testTsList.size()];
+      for (int i = 0; i < testTs.length; i++) {
+        testTs[i] = testTsList.get(i);
+        testData[i] = testDataList.get(i);
+      }
+
+      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+      DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData);
+      DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData);
+
+      MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData);
+      if (metricAnomaly == null) {
+        LOG.info("No anomaly from KS test.");
+      } else {
+        LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric....");
+        TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly,
+          ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        metricsCollectorInterface.emitMetrics(timelineMetrics);
+
+        trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly);
+      }
+    }
+
+    if (trendMetrics.isEmpty()) {
+      LOG.info("No Trend metrics tracked!!!!");
+    }
+
+  }
+
+  private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly,
+                                   long testStart,
+                                   long testEnd,
+                                   long trainStart,
+                                   long trainEnd) {
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName(metricAnomaly.getMetricKey());
+    timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType());
+    timelineMetric.setInstanceId(null);
+    timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+    timelineMetric.setStartTime(testEnd);
+    HashMap<String, String> metadata = new HashMap<>();
+    metadata.put("method", metricAnomaly.getMethodType());
+    metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore()));
+    metadata.put("test-start-time", String.valueOf(testStart));
+    metadata.put("train-start-time", String.valueOf(trainStart));
+    metadata.put("train-end-time", String.valueOf(trainEnd));
+    timelineMetric.setMetadata(metadata);
+    TreeMap<Long,Double> metricValues = new TreeMap<>();
+    metricValues.put(testEnd, metricAnomaly.getMetricValue());
+    timelineMetric.setMetricValues(metricValues);
+    return timelineMetric;
+
+  }
+
+  public void runHsdevMethod() {
+
+    List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
+
+    for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) {
+
+      long hsdevTestEnd = ksSingleRunKey.endTime;
+      long hsdevTestStart = ksSingleRunKey.startTime;
+
+      long period = hsdevTestEnd - hsdevTestStart;
+
+      long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period;
+      long hsdevTrainEnd = hsdevTestStart;
+
+      LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " +
+        new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart)
+        + " : " + new Date(hsdevTrainEnd) + "]");
+
+      String metricName = ksSingleRunKey.metricName;
+      String appId = ksSingleRunKey.appId;
+      String hostname = ksSingleRunKey.hostname;
+      String key = metricName + "_" + appId + "_" + hostname;
+
+      TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics(
+        metricName,
+        appId,
+        hostname,
+        hsdevTrainStart,
+        hsdevTestEnd);
+
+      if (hsdevData.getMetrics().isEmpty()) {
+        LOG.info("No metrics fetched for HSDev, metricKey = " + key);
+        continue;
+      }
+
+      List<Double> trainTsList = new ArrayList<>();
+      List<Double> trainDataList = new ArrayList<>();
+      List<Double> testTsList = new ArrayList<>();
+      List<Double> testDataList = new ArrayList<>();
+
+      for (TimelineMetric timelineMetric : hsdevData.getMetrics()) {
+        for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+          if (timestamp <= hsdevTestStart) {
+            trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            trainTsList.add((double) timestamp);
+          } else {
+            testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+            testTsList.add((double) timestamp);
+          }
+        }
+      }
+
+      if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+        LOG.info("Not enough train/test data to perform Hsdev analysis.");
+        continue;
+      }
+
+      String hsdevTrainSeries = "HsdevTrainSeries";
+      double[] trainTs = new double[trainTsList.size()];
+      double[] trainData = new double[trainTsList.size()];
+      for (int i = 0; i < trainTs.length; i++) {
+        trainTs[i] = trainTsList.get(i);
+        trainData[i] = trainDataList.get(i);
+      }
+
+      String hsdevTestSeries = "HsdevTestSeries";
+      double[] testTs = new double[testTsList.size()];
+      double[] testData = new double[testTsList.size()];
+      for (int i = 0; i < testTs.length; i++) {
+        testTs[i] = testTsList.get(i);
+        testData[i] = testDataList.get(i);
+      }
+
+      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+      DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData);
+      DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData);
+
+      MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData);
+      if (metricAnomaly == null) {
+        LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. ");
+        ksTechnique.updateModel(key, false, 10);
+      } else {
+        LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly.");
+        hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly,
+          hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd));
+      }
+    }
+    clearTrackedKsRunKeys();
+
+    if (!hsdevMetricAnomalies.isEmpty()) {
+      LOG.info("Publishing Hsdev Anomalies....");
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(hsdevMetricAnomalies);
+      metricsCollectorInterface.emitMetrics(timelineMetrics);
+    }
+  }
+
+  private void clearTrackedKsRunKeys() {
+    trackedKsAnomalies.clear();
+  }
+
+  private void readInputFile(String fileName) {
+    trendMetrics.clear();
+    try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
+      for (String line; (line = br.readLine()) != null; ) {
+        String[] splits = line.split(",");
+        LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
+        trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
+      }
+    } catch (IOException e) {
+      LOG.error("Error reading input file : " + e);
+    }
+  }
+
+  class KsSingleRunKey implements Serializable{
+
+    long startTime;
+    long endTime;
+    String metricName;
+    String appId;
+    String hostname;
+
+    public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) {
+      this.startTime = startTime;
+      this.endTime = endTime;
+      this.metricName = metricName;
+      this.appId = appId;
+      this.hostname = hostname;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java
new file mode 100644
index 0000000..0640142
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.core;
+
+import java.io.Serializable;
+
+public class TrendMetric implements Serializable {
+
+  String metricName;
+  String appId;
+  String hostname;
+
+  public TrendMetric(String metricName, String appId, String hostname) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.hostname = hostname;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
new file mode 100644
index 0000000..0b10b4b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.sql.Time;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AnomalyDetectionTechnique {
+
+  protected String methodType;
+
+  public abstract List<MetricAnomaly> test(TimelineMetric metric);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
new file mode 100644
index 0000000..da4f030
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricAnomaly implements Serializable{
+
+  private String methodType;
+  private double anomalyScore;
+  private String metricKey;
+  private long timestamp;
+  private double metricValue;
+
+
+  public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) {
+    this.metricKey = metricKey;
+    this.timestamp = timestamp;
+    this.metricValue = metricValue;
+    this.methodType = methodType;
+    this.anomalyScore = anomalyScore;
+
+  }
+
+  public String getMethodType() {
+    return methodType;
+  }
+
+  public void setMethodType(String methodType) {
+    this.methodType = methodType;
+  }
+
+  public double getAnomalyScore() {
+    return anomalyScore;
+  }
+
+  public void setAnomalyScore(double anomalyScore) {
+    this.anomalyScore = anomalyScore;
+  }
+
+  public void setMetricKey(String metricKey) {
+    this.metricKey = metricKey;
+  }
+
+  public String getMetricKey() {
+    return metricKey;
+  }
+
+  public void setMetricName(String metricName) {
+    this.metricKey = metricName;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public double getMetricValue() {
+    return metricValue;
+  }
+
+  public void setMetricValue(double metricValue) {
+    this.metricValue = metricValue;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
new file mode 100644
index 0000000..a31410d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+import static org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold;
+
+@XmlRootElement
+public class EmaModel implements Serializable {
+
+  private String metricName;
+  private String hostname;
+  private String appId;
+  private double ema;
+  private double ems;
+  private double weight;
+  private double timessdev;
+
+  private int ctr = 0;
+
+  private static final Log LOG = LogFactory.getLog(EmaModel.class);
+
+  public EmaModel(String name, String hostname, String appId, double weight, double timessdev) {
+    this.metricName = name;
+    this.hostname = hostname;
+    this.appId = appId;
+    this.weight = weight;
+    this.timessdev = timessdev;
+    this.ema = 0.0;
+    this.ems = 0.0;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public double testAndUpdate(double metricValue) {
+
+    double anomalyScore = 0.0;
+    LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev);
+    update(metricValue);
+    if (ctr > suppressAnomaliesTheshold) {
+      anomalyScore = test(metricValue);
+      if (anomalyScore > 0.0) {
+        LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+          ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+      } else {
+        LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+          ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+      }
+    } else {
+      ctr++;
+      if (ctr > suppressAnomaliesTheshold) {
+        LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data.");
+      }
+    }
+    return anomalyScore;
+  }
+
+  public void update(double metricValue) {
+    ema = weight * ema + (1 - weight) * metricValue;
+    ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
+    LOG.debug("In update : ema = " + ema + ", ems = " + ems);
+  }
+
+  public double test(double metricValue) {
+    LOG.debug("In test : ema = " + ema + ", ems = " + ems);
+    double diff = Math.abs(ema - metricValue) - (timessdev * ems);
+    LOG.debug("diff = " + diff);
+    if (diff > 0) {
+      return Math.abs((metricValue - ema) / ems); //Z score
+    } else {
+      return 0.0;
+    }
+  }
+
+  public void updateModel(boolean increaseSensitivity, double percent) {
+    LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent);
+    double delta = percent / 100;
+    if (increaseSensitivity) {
+      delta = delta * -1;
+    }
+    this.timessdev = timessdev + delta * timessdev;
+    //this.weight = Math.min(1.0, weight + delta * weight);
+    LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight);
+  }
+
+  public double getWeight() {
+    return weight;
+  }
+
+  public void setWeight(double weight) {
+    this.weight = weight;
+  }
+
+  public double getTimessdev() {
+    return timessdev;
+  }
+
+  public void setTimessdev(double timessdev) {
+    this.timessdev = timessdev;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
new file mode 100644
index 0000000..62749c1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Loader;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class EmaModelLoader implements Loader<EmaTechnique> {
+    private static final Log LOG = LogFactory.getLog(EmaModelLoader.class);
+
+    @Override
+    public EmaTechnique load(SparkContext sc, String path) {
+        return new EmaTechnique(0.5,3);
+//        Gson gson = new Gson();
+//        try {
+//            String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+//            return gson.fromJson(fileString, EmaTechnique.class);
+//        } catch (IOException e) {
+//            LOG.error(e);
+//        }
+//        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
new file mode 100644
index 0000000..52c6cf3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.AnomalyDetectionTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Saveable;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable {
+
+  @XmlElement(name = "trackedEmas")
+  private Map<String, EmaModel> trackedEmas;
+  private static final Log LOG = LogFactory.getLog(EmaTechnique.class);
+
+  private double startingWeight = 0.5;
+  private double startTimesSdev = 3.0;
+  private String methodType = "ema";
+  public static int suppressAnomaliesTheshold = 100;
+
+  public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) {
+    trackedEmas = new HashMap<>();
+    this.startingWeight = startingWeight;
+    this.startTimesSdev = startTimesSdev;
+    EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold;
+    LOG.info("New EmaTechnique......");
+  }
+
+  public EmaTechnique(double startingWeight, double startTimesSdev) {
+    trackedEmas = new HashMap<>();
+    this.startingWeight = startingWeight;
+    this.startTimesSdev = startTimesSdev;
+    LOG.info("New EmaTechnique......");
+  }
+
+  public List<MetricAnomaly> test(TimelineMetric metric) {
+    String metricName = metric.getMetricName();
+    String appId = metric.getAppId();
+    String hostname = metric.getHostName();
+    String key = metricName + ":" + appId + ":" + hostname;
+
+    EmaModel emaModel = trackedEmas.get(key);
+    if (emaModel == null) {
+      LOG.debug("EmaModel not present for " + key);
+      LOG.debug("Number of tracked Emas : " + trackedEmas.size());
+      emaModel  = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev);
+      trackedEmas.put(key, emaModel);
+    } else {
+      LOG.debug("EmaModel already present for " + key);
+    }
+
+    List<MetricAnomaly> anomalies = new ArrayList<>();
+
+    for (Long timestamp : metric.getMetricValues().keySet()) {
+      double metricValue = metric.getMetricValues().get(timestamp);
+      double anomalyScore = emaModel.testAndUpdate(metricValue);
+      if (anomalyScore > 0.0) {
+        LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore);
+        MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore);
+        anomalies.add(metricAnomaly);
+      } else {
+        LOG.debug("Discarding non-anomaly for : " + key);
+      }
+    }
+    return anomalies;
+  }
+
+  public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) {
+    String metricName = timelineMetric.getMetricName();
+    String appId = timelineMetric.getAppId();
+    String hostname = timelineMetric.getHostName();
+    String key = metricName + "_" + appId + "_" + hostname;
+
+
+    EmaModel emaModel = trackedEmas.get(key);
+
+    if (emaModel == null) {
+      LOG.warn("EMA Model for " + key + " not found");
+      return false;
+    }
+    emaModel.updateModel(increaseSensitivity, percent);
+
+    return true;
+  }
+
+  @Override
+  public void save(SparkContext sc, String path) {
+    Gson gson = new Gson();
+    try {
+      String json = gson.toJson(this);
+      try (Writer writer = new BufferedWriter(new OutputStreamWriter(
+        new FileOutputStream(path), "utf-8"))) {
+        writer.write(json);
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  @Override
+  public String formatVersion() {
+    return "1.0";
+  }
+
+  public Map<String, EmaModel> getTrackedEmas() {
+    return trackedEmas;
+  }
+
+  public double getStartingWeight() {
+    return startingWeight;
+  }
+
+  public double getStartTimesSdev() {
+    return startTimesSdev;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
new file mode 100644
index 0000000..04f4a73
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.hsdev;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.median;
+import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.sdev;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HsdevTechnique implements Serializable {
+
+  private Map<String, Double> hsdevMap;
+  private String methodType = "hsdev";
+  private static final Log LOG = LogFactory.getLog(HsdevTechnique.class);
+
+  public HsdevTechnique() {
+    hsdevMap = new HashMap<>();
+  }
+
+  public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) {
+    int testLength = testData.values.length;
+    int trainLength = trainData.values.length;
+
+    if (trainLength < testLength) {
+      LOG.info("Not enough train data.");
+      return null;
+    }
+
+    if (!hsdevMap.containsKey(key)) {
+      hsdevMap.put(key, 3.0);
+    }
+
+    double n = hsdevMap.get(key);
+
+    double historicSd = sdev(trainData.values, false);
+    double historicMedian = median(trainData.values);
+    double currentMedian = median(testData.values);
+
+
+    if (historicSd > 0) {
+      double diff = Math.abs(currentMedian - historicMedian);
+      LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
+      LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
+
+      if (diff > n * historicSd) {
+        double zScore = diff / historicSd;
+        LOG.info("Z Score of current series : " + zScore);
+        return new MetricAnomaly(key,
+          (long) testData.ts[testLength - 1],
+          testData.values[testLength - 1],
+          methodType,
+          zScore);
+      }
+    }
+
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
new file mode 100644
index 0000000..a9360d3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype.methods.kstest;
+
+import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KSTechnique implements Serializable {
+
+  private String methodType = "ks";
+  private Map<String, Double> pValueMap;
+  private static final Log LOG = LogFactory.getLog(KSTechnique.class);
+
+  public KSTechnique() {
+    pValueMap = new HashMap();
+  }
+
+  public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries testData) {
+
+    int testLength = testData.values.length;
+    int trainLength = trainData.values.length;
+
+    if (trainLength < testLength) {
+      LOG.info("Not enough train data.");
+      return null;
+    }
+
+    if (!pValueMap.containsKey(key)) {
+      pValueMap.put(key, 0.05);
+    }
+    double pValue = pValueMap.get(key);
+
+    ResultSet result = RFunctionInvoker.ksTest(trainData, testData, Collections.singletonMap("ks.p_value", String.valueOf(pValue)));
+    if (result == null) {
+      LOG.error("Resultset is null when invoking KS R function...");
+      return null;
+    }
+
+    if (result.resultset.size() > 0) {
+
+      LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length);
+      LOG.info("p_value = " + result.resultset.get(3)[0]);
+      double dValue = result.resultset.get(2)[0];
+
+      return new MetricAnomaly(key,
+        (long) testData.ts[testLength - 1],
+        testData.values[testLength - 1],
+        methodType,
+        dValue);
+    }
+
+    return null;
+  }
+
+  public void updateModel(String metricKey, boolean increaseSensitivity, double percent) {
+
+    LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent);
+
+    if (!pValueMap.containsKey(metricKey)) {
+      LOG.error("Unknown metric key : " + metricKey);
+      LOG.info("pValueMap :" + pValueMap.toString());
+      return;
+    }
+
+    double delta = percent / 100;
+    if (!increaseSensitivity) {
+      delta = delta * -1;
+    }
+
+    double pValue = pValueMap.get(metricKey);
+    double newPValue = Math.min(1.0, pValue + delta * pValue);
+    pValueMap.put(metricKey, newPValue);
+    LOG.info("New pValue = " + newPValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
new file mode 100644
index 0000000..268cd15
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype.testing.utilities;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class MetricAnomalyDetectorTestInput {
+
+  public MetricAnomalyDetectorTestInput() {
+  }
+
+  //Train data
+  private String trainDataName;
+  private String trainDataType;
+  private Map<String, String> trainDataConfigs;
+  private int trainDataSize;
+
+  //Test data
+  private String testDataName;
+  private String testDataType;
+  private Map<String, String> testDataConfigs;
+  private int testDataSize;
+
+  //Algorithm data
+  private List<String> methods;
+  private Map<String, String> methodConfigs;
+
+  public String getTrainDataName() {
+    return trainDataName;
+  }
+
+  public void setTrainDataName(String trainDataName) {
+    this.trainDataName = trainDataName;
+  }
+
+  public String getTrainDataType() {
+    return trainDataType;
+  }
+
+  public void setTrainDataType(String trainDataType) {
+    this.trainDataType = trainDataType;
+  }
+
+  public Map<String, String> getTrainDataConfigs() {
+    return trainDataConfigs;
+  }
+
+  public void setTrainDataConfigs(Map<String, String> trainDataConfigs) {
+    this.trainDataConfigs = trainDataConfigs;
+  }
+
+  public String getTestDataName() {
+    return testDataName;
+  }
+
+  public void setTestDataName(String testDataName) {
+    this.testDataName = testDataName;
+  }
+
+  public String getTestDataType() {
+    return testDataType;
+  }
+
+  public void setTestDataType(String testDataType) {
+    this.testDataType = testDataType;
+  }
+
+  public Map<String, String> getTestDataConfigs() {
+    return testDataConfigs;
+  }
+
+  public void setTestDataConfigs(Map<String, String> testDataConfigs) {
+    this.testDataConfigs = testDataConfigs;
+  }
+
+  public Map<String, String> getMethodConfigs() {
+    return methodConfigs;
+  }
+
+  public void setMethodConfigs(Map<String, String> methodConfigs) {
+    this.methodConfigs = methodConfigs;
+  }
+
+  public int getTrainDataSize() {
+    return trainDataSize;
+  }
+
+  public void setTrainDataSize(int trainDataSize) {
+    this.trainDataSize = trainDataSize;
+  }
+
+  public int getTestDataSize() {
+    return testDataSize;
+  }
+
+  public void setTestDataSize(int testDataSize) {
+    this.testDataSize = testDataSize;
+  }
+
+  public List<String> getMethods() {
+    return methods;
+  }
+
+  public void setMethods(List<String> methods) {
+    this.methods = methods;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyTester.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyTester.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyTester.java
new file mode 100644
index 0000000..6485ebb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyTester.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype.testing.utilities;
+
+import org.apache.ambari.metrics.alertservice.prototype.core.MetricsCollectorInterface;
+import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class MetricAnomalyTester {
+
+  public static String appId = MetricsCollectorInterface.serviceName;
+  static final Log LOG = LogFactory.getLog(MetricAnomalyTester.class);
+  static Map<String, TimelineMetric> timelineMetricMap = new HashMap<>();
+
+  public static TimelineMetrics runTestAnomalyRequest(MetricAnomalyDetectorTestInput input) throws UnknownHostException {
+
+    long currentTime = System.currentTimeMillis();
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    String hostname = InetAddress.getLocalHost().getHostName();
+
+    //Train data
+    TimelineMetric metric1 = new TimelineMetric();
+    if (StringUtils.isNotEmpty(input.getTrainDataName())) {
+      metric1 = timelineMetricMap.get(input.getTrainDataName());
+      if (metric1 == null) {
+        metric1 = new TimelineMetric();
+        double[] trainSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTrainDataType(), input.getTrainDataSize(), input.getTrainDataConfigs());
+        metric1.setMetricName(input.getTrainDataName());
+        metric1.setAppId(appId);
+        metric1.setHostName(hostname);
+        metric1.setStartTime(currentTime);
+        metric1.setInstanceId(null);
+        metric1.setMetricValues(getAsTimeSeries(currentTime, trainSeries));
+        timelineMetricMap.put(input.getTrainDataName(), metric1);
+      }
+      timelineMetrics.getMetrics().add(metric1);
+    } else {
+      LOG.error("No train data name specified");
+    }
+
+    //Test data
+    TimelineMetric metric2 = new TimelineMetric();
+    if (StringUtils.isNotEmpty(input.getTestDataName())) {
+      metric2 = timelineMetricMap.get(input.getTestDataName());
+      if (metric2 == null) {
+        metric2 = new TimelineMetric();
+        double[] testSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTestDataType(), input.getTestDataSize(), input.getTestDataConfigs());
+        metric2.setMetricName(input.getTestDataName());
+        metric2.setAppId(appId);
+        metric2.setHostName(hostname);
+        metric2.setStartTime(currentTime);
+        metric2.setInstanceId(null);
+        metric2.setMetricValues(getAsTimeSeries(currentTime, testSeries));
+        timelineMetricMap.put(input.getTestDataName(), metric2);
+      }
+      timelineMetrics.getMetrics().add(metric2);
+    } else {
+      LOG.warn("No test data name specified");
+    }
+
+    //Invoke method
+    if (CollectionUtils.isNotEmpty(input.getMethods())) {
+      RFunctionInvoker.setScriptsDir("/etc/ambari-metrics-collector/conf/R-scripts");
+      for (String methodType : input.getMethods()) {
+        ResultSet result = RFunctionInvoker.executeMethod(methodType, getAsDataSeries(metric1), getAsDataSeries(metric2), input.getMethodConfigs());
+        TimelineMetric timelineMetric = getAsTimelineMetric(result, methodType, input, currentTime, hostname);
+        if (timelineMetric != null) {
+          timelineMetrics.getMetrics().add(timelineMetric);
+        }
+      }
+    } else {
+      LOG.warn("No anomaly method requested");
+    }
+
+    return timelineMetrics;
+  }
+
+
+  private static TimelineMetric getAsTimelineMetric(ResultSet result, String methodType, MetricAnomalyDetectorTestInput input, long currentTime, String hostname) {
+
+    if (result == null) {
+      return null;
+    }
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+    if (methodType.equals("tukeys") || methodType.equals("ema")) {
+      timelineMetric.setMetricName(input.getTrainDataName() + "_" + input.getTestDataName() + "_" + methodType + "_" + currentTime);
+      timelineMetric.setHostName(hostname);
+      timelineMetric.setAppId(appId);
+      timelineMetric.setInstanceId(null);
+      timelineMetric.setStartTime(currentTime);
+
+      TreeMap<Long, Double> metricValues = new TreeMap<>();
+      if (result.resultset.size() > 0) {
+        double[] ts = result.resultset.get(0);
+        double[] metrics = result.resultset.get(1);
+        for (int i = 0; i < ts.length; i++) {
+          if (i == 0) {
+            timelineMetric.setStartTime((long) ts[i]);
+          }
+          metricValues.put((long) ts[i], metrics[i]);
+        }
+      }
+      timelineMetric.setMetricValues(metricValues);
+      return timelineMetric;
+    }
+    return null;
+  }
+
+
+  private static TreeMap<Long, Double> getAsTimeSeries(long currentTime, double[] values) {
+
+    long startTime = currentTime - (values.length - 1) * 60 * 1000;
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+
+    for (int i = 0; i < values.length; i++) {
+      metricValues.put(startTime, values[i]);
+      startTime += (60 * 1000);
+    }
+    return metricValues;
+  }
+
+  private static DataSeries getAsDataSeries(TimelineMetric timelineMetric) {
+
+    TreeMap<Long, Double> metricValues = timelineMetric.getMetricValues();
+    double[] timestamps = new double[metricValues.size()];
+    double[] values = new double[metricValues.size()];
+    int i = 0;
+
+    for (Long timestamp : metricValues.keySet()) {
+      timestamps[i] = timestamp;
+      values[i++] = metricValues.get(timestamp);
+    }
+    return new DataSeries(timelineMetric.getMetricName() + "_" + timelineMetric.getAppId() + "_" + timelineMetric.getHostName(), timestamps, values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestMetricSeriesGenerator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestMetricSeriesGenerator.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestMetricSeriesGenerator.java
new file mode 100644
index 0000000..b817f3e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestMetricSeriesGenerator.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.testing.utilities;
+
+/**
+ * Class which was originally used to send test series from AMS to Spark through Kafka.
+ */
+
+public class TestMetricSeriesGenerator {
+  //implements Runnable {
+
+//  private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>();
+//  private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class);
+//  private TimelineMetricStore metricStore;
+//  private String hostname;
+//
+//  public TestMetricSeriesGenerator(TimelineMetricStore metricStore) {
+//    this.metricStore = metricStore;
+//    try {
+//      this.hostname = InetAddress.getLocalHost().getHostName();
+//    } catch (UnknownHostException e) {
+//      e.printStackTrace();
+//    }
+//  }
+//
+//  public void addSeries(TestSeriesInputRequest inputRequest) {
+//    if (!configuredSeries.containsKey(inputRequest)) {
+//      AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs());
+//      configuredSeries.put(inputRequest, metricSeries);
+//      LOG.info("Added series " + inputRequest.getSeriesName());
+//    }
+//  }
+//
+//  public void removeSeries(String seriesName) {
+//    boolean isPresent = false;
+//    TestSeriesInputRequest tbd = null;
+//    for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) {
+//      if (inputRequest.getSeriesName().equals(seriesName)) {
+//        isPresent = true;
+//        tbd = inputRequest;
+//      }
+//    }
+//    if (isPresent) {
+//      LOG.info("Removing series " + seriesName);
+//      configuredSeries.remove(tbd);
+//    } else {
+//      LOG.info("Series not found : " + seriesName);
+//    }
+//  }
+//
+//  @Override
+//  public void run() {
+//    long currentTime = System.currentTimeMillis();
+//    TimelineMetrics timelineMetrics = new TimelineMetrics();
+//
+//    for (TestSeriesInputRequest input : configuredSeries.keySet()) {
+//      AbstractMetricSeries metricSeries = configuredSeries.get(input);
+//      TimelineMetric timelineMetric = new TimelineMetric();
+//      timelineMetric.setMetricName(input.getSeriesName());
+//      timelineMetric.setAppId("anomaly-engine-test-metric");
+//      timelineMetric.setInstanceId(null);
+//      timelineMetric.setStartTime(currentTime);
+//      timelineMetric.setHostName(hostname);
+//      TreeMap<Long, Double> metricValues = new TreeMap();
+//      metricValues.put(currentTime, metricSeries.nextValue());
+//      timelineMetric.setMetricValues(metricValues);
+//      timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+//      LOG.info("Emitting metric with appId = " + timelineMetric.getAppId());
+//    }
+//    try {
+//      LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series.");
+//      metricStore.putMetrics(timelineMetrics);
+//    } catch (Exception e) {
+//      LOG.error(e);
+//    }
+//  }
+}