You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/09/27 21:04:52 UTC

[2/6] ambari git commit: AMBARI-22077 : Create maven module and package structure for the anomaly detection engine. (avijayan)

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java
new file mode 100644
index 0000000..a424f8e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.testing.utilities;
+
+import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collections;
+import java.util.Map;
+
+@XmlRootElement
+public class TestSeriesInputRequest {
+
+  private String seriesName;
+  private String seriesType;
+  private Map<String, String> configs;
+
+  public TestSeriesInputRequest() {
+  }
+
+  public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) {
+    this.seriesName = seriesName;
+    this.seriesType = seriesType;
+    this.configs = configs;
+  }
+
+  public String getSeriesName() {
+    return seriesName;
+  }
+
+  public void setSeriesName(String seriesName) {
+    this.seriesName = seriesName;
+  }
+
+  public String getSeriesType() {
+    return seriesType;
+  }
+
+  public void setSeriesType(String seriesType) {
+    this.seriesType = seriesType;
+  }
+
+  public Map<String, String> getConfigs() {
+    return configs;
+  }
+
+  public void setConfigs(Map<String, String> configs) {
+    this.configs = configs;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o;
+    return anotherInput.getSeriesName().equals(this.getSeriesName());
+  }
+
+  @Override
+  public int hashCode() {
+    return seriesName.hashCode();
+  }
+
+  public static void main(String[] args) {
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value"));
+    try {
+      System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest));
+    } catch (JsonProcessingException e) {
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
new file mode 100644
index 0000000..a8e31bf
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+public interface AbstractMetricSeries {
+
+  public double nextValue();
+  public double[] getSeries(int n);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
new file mode 100644
index 0000000..4158ff4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class DualBandMetricSeries implements AbstractMetricSeries {
+
+  double lowBandValue = 0.0;
+  double lowBandDeviationPercentage = 0.0;
+  int lowBandPeriodSize = 10;
+  double highBandValue = 1.0;
+  double highBandDeviationPercentage = 0.0;
+  int highBandPeriodSize = 10;
+
+  Random random = new Random();
+  double lowBandValueLowerLimit, lowBandValueHigherLimit;
+  double highBandLowerLimit, highBandUpperLimit;
+  int l = 0, h = 0;
+
+  public DualBandMetricSeries(double lowBandValue,
+                              double lowBandDeviationPercentage,
+                              int lowBandPeriodSize,
+                              double highBandValue,
+                              double highBandDeviationPercentage,
+                              int highBandPeriodSize) {
+    this.lowBandValue = lowBandValue;
+    this.lowBandDeviationPercentage = lowBandDeviationPercentage;
+    this.lowBandPeriodSize = lowBandPeriodSize;
+    this.highBandValue = highBandValue;
+    this.highBandDeviationPercentage = highBandDeviationPercentage;
+    this.highBandPeriodSize = highBandPeriodSize;
+    init();
+  }
+
+  private void init() {
+    lowBandValueLowerLimit = lowBandValue - lowBandDeviationPercentage * lowBandValue;
+    lowBandValueHigherLimit = lowBandValue + lowBandDeviationPercentage * lowBandValue;
+    highBandLowerLimit = highBandValue - highBandDeviationPercentage * highBandValue;
+    highBandUpperLimit = highBandValue + highBandDeviationPercentage * highBandValue;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value = 0.0;
+
+    if (l < lowBandPeriodSize) {
+      value = lowBandValueLowerLimit + (lowBandValueHigherLimit - lowBandValueLowerLimit) * random.nextDouble();
+      l++;
+    } else if (h < highBandPeriodSize) {
+      value = highBandLowerLimit + (highBandUpperLimit - highBandLowerLimit) * random.nextDouble();
+      h++;
+    }
+
+    if (l == lowBandPeriodSize && h == highBandPeriodSize) {
+      l = 0;
+      h = 0;
+    }
+
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
new file mode 100644
index 0000000..1e37ff3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+public class MetricSeriesGeneratorFactory {
+
+  /**
+   * Return a normally distributed data series with some deviation % and outliers.
+   *
+   * @param n                                size of the data series
+   * @param value                            The value around which the uniform data series is centered on.
+   * @param deviationPercentage              The allowed deviation % on either side of the uniform value. For example, if value = 10, and deviation % is 0.1, the series values lie between 0.9 to 1.1.
+   * @param outlierProbability               The probability of finding an outlier in the series.
+   * @param outlierDeviationLowerPercentage  min percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and  13.
+   * @param outlierDeviationHigherPercentage max percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and  16.
+   * @param outliersAboveValue               Outlier should be greater or smaller than the value.
+   * @return uniform series
+   */
+  public static double[] createUniformSeries(int n,
+                                             double value,
+                                             double deviationPercentage,
+                                             double outlierProbability,
+                                             double outlierDeviationLowerPercentage,
+                                             double outlierDeviationHigherPercentage,
+                                             boolean outliersAboveValue) {
+
+    UniformMetricSeries metricSeries = new UniformMetricSeries(value,
+      deviationPercentage,
+      outlierProbability,
+      outlierDeviationLowerPercentage,
+      outlierDeviationHigherPercentage,
+      outliersAboveValue);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  /**
+   * /**
+   * Returns a normally distributed series.
+   *
+   * @param n                             size of the data series
+   * @param mean                          mean of the distribution
+   * @param sd                            sd of the distribution
+   * @param outlierProbability            sd of the distribution
+   * @param outlierDeviationSDTimesLower  Lower Limit of the outlier with respect to times sdev from the mean.
+   * @param outlierDeviationSDTimesHigher Higher Limit of the outlier with respect to times sdev from the mean.
+   * @param outlierOnRightEnd             Outlier should be on the right end or the left end.
+   * @return normal series
+   */
+  public static double[] createNormalSeries(int n,
+                                            double mean,
+                                            double sd,
+                                            double outlierProbability,
+                                            double outlierDeviationSDTimesLower,
+                                            double outlierDeviationSDTimesHigher,
+                                            boolean outlierOnRightEnd) {
+
+
+    NormalMetricSeries metricSeries = new NormalMetricSeries(mean,
+      sd,
+      outlierProbability,
+      outlierDeviationSDTimesLower,
+      outlierDeviationSDTimesHigher,
+      outlierOnRightEnd);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  /**
+   * Returns a monotonically increasing / decreasing series
+   *
+   * @param n                                size of the data series
+   * @param startValue                       Start value of the monotonic sequence
+   * @param slope                            direction of monotonicity m > 0 for increasing and m < 0 for decreasing.
+   * @param deviationPercentage              The allowed deviation % on either side of the current 'y' value. For example, if current value = 10 according to slope, and deviation % is 0.1, the series values lie between 0.9 to 1.1.
+   * @param outlierProbability               The probability of finding an outlier in the series.
+   * @param outlierDeviationLowerPercentage  min percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and  13.
+   * @param outlierDeviationHigherPercentage max percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and  16.
+   * @param outliersAboveValue               Outlier should be greater or smaller than the 'y' value.
+   * @return
+   */
+  public static double[] createMonotonicSeries(int n,
+                                               double startValue,
+                                               double slope,
+                                               double deviationPercentage,
+                                               double outlierProbability,
+                                               double outlierDeviationLowerPercentage,
+                                               double outlierDeviationHigherPercentage,
+                                               boolean outliersAboveValue) {
+
+    MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(startValue,
+      slope,
+      deviationPercentage,
+      outlierProbability,
+      outlierDeviationLowerPercentage,
+      outlierDeviationHigherPercentage,
+      outliersAboveValue);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  /**
+   * Returns a dual band series (lower and higher)
+   *
+   * @param n                           size of the data series
+   * @param lowBandValue                lower band value
+   * @param lowBandDeviationPercentage  lower band deviation
+   * @param lowBandPeriodSize           lower band
+   * @param highBandValue               high band centre value
+   * @param highBandDeviationPercentage high band deviation.
+   * @param highBandPeriodSize          high band size
+   * @return
+   */
+  public static double[] getDualBandSeries(int n,
+                                           double lowBandValue,
+                                           double lowBandDeviationPercentage,
+                                           int lowBandPeriodSize,
+                                           double highBandValue,
+                                           double highBandDeviationPercentage,
+                                           int highBandPeriodSize) {
+
+    DualBandMetricSeries metricSeries  = new DualBandMetricSeries(lowBandValue,
+      lowBandDeviationPercentage,
+      lowBandPeriodSize,
+      highBandValue,
+      highBandDeviationPercentage,
+      highBandPeriodSize);
+
+    return metricSeries.getSeries(n);
+  }
+
+  /**
+   * Returns a step function series.
+   *
+   * @param n                              size of the data series
+   * @param startValue                     start steady value
+   * @param steadyValueDeviationPercentage required devation in the steady state value
+   * @param steadyPeriodSlope              direction of monotonicity m > 0 for increasing and m < 0 for decreasing, m = 0 no increase or decrease.
+   * @param steadyPeriodMinSize            min size for step period
+   * @param steadyPeriodMaxSize            max size for step period.
+   * @param stepChangePercentage           Increase / decrease in steady state to denote a step in terms of deviation percentage from the last value.
+   * @param upwardStep                     upward or downward step.
+   * @return
+   */
+  public static double[] getStepFunctionSeries(int n,
+                                               double startValue,
+                                               double steadyValueDeviationPercentage,
+                                               double steadyPeriodSlope,
+                                               int steadyPeriodMinSize,
+                                               int steadyPeriodMaxSize,
+                                               double stepChangePercentage,
+                                               boolean upwardStep) {
+
+    StepFunctionMetricSeries metricSeries = new StepFunctionMetricSeries(startValue,
+      steadyValueDeviationPercentage,
+      steadyPeriodSlope,
+      steadyPeriodMinSize,
+      steadyPeriodMaxSize,
+      stepChangePercentage,
+      upwardStep);
+
+    return metricSeries.getSeries(n);
+  }
+
+  /**
+   * Series with small period of turbulence and then back to steady.
+   *
+   * @param n                                        size of the data series
+   * @param steadyStateValue                         steady state center value
+   * @param steadyStateDeviationPercentage           steady state deviation in percentage
+   * @param turbulentPeriodDeviationLowerPercentage  turbulent state lower limit in terms of percentage from centre value.
+   * @param turbulentPeriodDeviationHigherPercentage turbulent state higher limit in terms of percentage from centre value.
+   * @param turbulentPeriodLength                    turbulent period length (number of points)
+   * @param turbulentStatePosition                   Where the turbulent state should be 0 - at the beginning, 1 - in the middle (25% - 50% of the series), 2 - at the end of the series.
+   * @return
+   */
+  public static double[] getSteadySeriesWithTurbulentPeriod(int n,
+                                                            double steadyStateValue,
+                                                            double steadyStateDeviationPercentage,
+                                                            double turbulentPeriodDeviationLowerPercentage,
+                                                            double turbulentPeriodDeviationHigherPercentage,
+                                                            int turbulentPeriodLength,
+                                                            int turbulentStatePosition
+  ) {
+
+
+    SteadyWithTurbulenceMetricSeries metricSeries = new SteadyWithTurbulenceMetricSeries(n,
+      steadyStateValue,
+      steadyStateDeviationPercentage,
+      turbulentPeriodDeviationLowerPercentage,
+      turbulentPeriodDeviationHigherPercentage,
+      turbulentPeriodLength,
+      turbulentStatePosition);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  public static double[] generateSeries(String type, int n, Map<String, String> configs) {
+
+    double[] series;
+    switch (type) {
+
+      case "normal":
+        series = createNormalSeries(n,
+          Double.parseDouble(configs.getOrDefault("mean", "0")),
+          Double.parseDouble(configs.getOrDefault("sd", "1")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true")));
+        break;
+
+      case "uniform":
+        series = createUniformSeries(n,
+          Double.parseDouble(configs.getOrDefault("value", "10")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+        break;
+
+      case "monotonic":
+        series = createMonotonicSeries(n,
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("slope", "0")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+        break;
+
+      case "dualband":
+        series = getDualBandSeries(n,
+          Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+          Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+          Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+          Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+        break;
+
+      case "step":
+        series = getStepFunctionSeries(n,
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+          Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+        break;
+
+      case "turbulence":
+        series = getSteadySeriesWithTurbulentPeriod(n,
+          Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+          Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")),
+          Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+          Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0")));
+        break;
+
+      default:
+        series = createNormalSeries(n,
+          0,
+          1,
+          0,
+          0,
+          0,
+          true);
+    }
+    return series;
+  }
+
+  public static AbstractMetricSeries generateSeries(String type, Map<String, String> configs) {
+
+    AbstractMetricSeries series;
+    switch (type) {
+
+      case "normal":
+        series = new NormalMetricSeries(Double.parseDouble(configs.getOrDefault("mean", "0")),
+          Double.parseDouble(configs.getOrDefault("sd", "1")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true")));
+        break;
+
+      case "uniform":
+        series = new UniformMetricSeries(
+          Double.parseDouble(configs.getOrDefault("value", "10")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+        break;
+
+      case "monotonic":
+        series = new MonotonicMetricSeries(
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("slope", "0")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+        break;
+
+      case "dualband":
+        series = new DualBandMetricSeries(
+          Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+          Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+          Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+          Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+        break;
+
+      case "step":
+        series = new StepFunctionMetricSeries(
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+          Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+        break;
+
+      case "turbulence":
+        series = new SteadyWithTurbulenceMetricSeries(
+          Integer.parseInt(configs.getOrDefault("approxSeriesLength", "100")),
+          Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+          Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")),
+          Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+          Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0")));
+        break;
+
+      default:
+        series = new NormalMetricSeries(0,
+          1,
+          0,
+          0,
+          0,
+          true);
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
new file mode 100644
index 0000000..a883d08
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class MonotonicMetricSeries implements AbstractMetricSeries {
+
+  double startValue = 0.0;
+  double slope = 0.5;
+  double deviationPercentage = 0.0;
+  double outlierProbability = 0.0;
+  double outlierDeviationLowerPercentage = 0.0;
+  double outlierDeviationHigherPercentage = 0.0;
+  boolean outliersAboveValue = true;
+
+  Random random = new Random();
+  double nonOutlierProbability;
+
+  // y = mx + c
+  double y;
+  double m;
+  double x;
+  double c;
+
+  public MonotonicMetricSeries(double startValue,
+                               double slope,
+                               double deviationPercentage,
+                               double outlierProbability,
+                               double outlierDeviationLowerPercentage,
+                               double outlierDeviationHigherPercentage,
+                               boolean outliersAboveValue) {
+    this.startValue = startValue;
+    this.slope = slope;
+    this.deviationPercentage = deviationPercentage;
+    this.outlierProbability = outlierProbability;
+    this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+    this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+    this.outliersAboveValue = outliersAboveValue;
+    init();
+  }
+
+  private void init() {
+    y = startValue;
+    m = slope;
+    x = 1;
+    c = y - (m * x);
+    nonOutlierProbability = 1.0 - outlierProbability;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+    double probability = random.nextDouble();
+
+    y = m * x + c;
+    if (probability <= nonOutlierProbability) {
+      double valueDeviationLowerLimit = y - deviationPercentage * y;
+      double valueDeviationHigherLimit = y + deviationPercentage * y;
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+    } else {
+      if (outliersAboveValue) {
+        double outlierLowerLimit = y + outlierDeviationLowerPercentage * y;
+        double outlierUpperLimit = y + outlierDeviationHigherPercentage * y;
+        value = outlierLowerLimit + (outlierUpperLimit - outlierLowerLimit) * random.nextDouble();
+      } else {
+        double outlierLowerLimit = y - outlierDeviationLowerPercentage * y;
+        double outlierUpperLimit = y - outlierDeviationHigherPercentage * y;
+        value = outlierUpperLimit + (outlierLowerLimit - outlierUpperLimit) * random.nextDouble();
+      }
+    }
+    x++;
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
new file mode 100644
index 0000000..cc83d2c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class NormalMetricSeries implements AbstractMetricSeries {
+
+  double mean = 0.0;
+  double sd = 1.0;
+  double outlierProbability = 0.0;
+  double outlierDeviationSDTimesLower = 0.0;
+  double outlierDeviationSDTimesHigher = 0.0;
+  boolean outlierOnRightEnd = true;
+
+  Random random = new Random();
+  double nonOutlierProbability;
+
+
+  public NormalMetricSeries(double mean,
+                            double sd,
+                            double outlierProbability,
+                            double outlierDeviationSDTimesLower,
+                            double outlierDeviationSDTimesHigher,
+                            boolean outlierOnRightEnd) {
+    this.mean = mean;
+    this.sd = sd;
+    this.outlierProbability = outlierProbability;
+    this.outlierDeviationSDTimesLower = outlierDeviationSDTimesLower;
+    this.outlierDeviationSDTimesHigher = outlierDeviationSDTimesHigher;
+    this.outlierOnRightEnd = outlierOnRightEnd;
+    init();
+  }
+
+  private void init() {
+    nonOutlierProbability = 1.0 - outlierProbability;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+    double probability = random.nextDouble();
+
+    if (probability <= nonOutlierProbability) {
+      value = random.nextGaussian() * sd + mean;
+    } else {
+      if (outlierOnRightEnd) {
+        value = mean + (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd;
+      } else {
+        value = mean - (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd;
+      }
+    }
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
new file mode 100644
index 0000000..c4ed3ba
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class SteadyWithTurbulenceMetricSeries implements AbstractMetricSeries {
+
+  double steadyStateValue = 0.0;
+  double steadyStateDeviationPercentage = 0.0;
+  double turbulentPeriodDeviationLowerPercentage = 0.3;
+  double turbulentPeriodDeviationHigherPercentage = 0.5;
+  int turbulentPeriodLength = 5;
+  int turbulentStatePosition = 1;
+  int approximateSeriesLength = 10;
+
+  Random random = new Random();
+  double valueDeviationLowerLimit;
+  double valueDeviationHigherLimit;
+  double tPeriodLowerLimit;
+  double tPeriodUpperLimit;
+  int tPeriodStartIndex = 0;
+  int index = 0;
+
+  public SteadyWithTurbulenceMetricSeries(int approximateSeriesLength,
+                                          double steadyStateValue,
+                                          double steadyStateDeviationPercentage,
+                                          double turbulentPeriodDeviationLowerPercentage,
+                                          double turbulentPeriodDeviationHigherPercentage,
+                                          int turbulentPeriodLength,
+                                          int turbulentStatePosition) {
+    this.approximateSeriesLength = approximateSeriesLength;
+    this.steadyStateValue = steadyStateValue;
+    this.steadyStateDeviationPercentage = steadyStateDeviationPercentage;
+    this.turbulentPeriodDeviationLowerPercentage = turbulentPeriodDeviationLowerPercentage;
+    this.turbulentPeriodDeviationHigherPercentage = turbulentPeriodDeviationHigherPercentage;
+    this.turbulentPeriodLength = turbulentPeriodLength;
+    this.turbulentStatePosition = turbulentStatePosition;
+    init();
+  }
+
+  private void init() {
+
+    if (turbulentStatePosition == 1) {
+      tPeriodStartIndex = (int) (0.25 * approximateSeriesLength + (0.25 * approximateSeriesLength * random.nextDouble()));
+    } else if (turbulentStatePosition == 2) {
+      tPeriodStartIndex = approximateSeriesLength - turbulentPeriodLength;
+    }
+
+    valueDeviationLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue;
+    valueDeviationHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue;
+
+    tPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+    tPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+
+    if (index >= tPeriodStartIndex && index <= (tPeriodStartIndex + turbulentPeriodLength)) {
+      value = tPeriodLowerLimit + (tPeriodUpperLimit - tPeriodLowerLimit) * random.nextDouble();
+    } else {
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+    }
+    index++;
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+
+    double[] series = new double[n];
+    int turbulentPeriodStartIndex = 0;
+
+    if (turbulentStatePosition == 1) {
+      turbulentPeriodStartIndex = (int) (0.25 * n + (0.25 * n * random.nextDouble()));
+    } else if (turbulentStatePosition == 2) {
+      turbulentPeriodStartIndex = n - turbulentPeriodLength;
+    }
+
+    double valueDevLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue;
+    double valueDevHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue;
+
+    double turbulentPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+    double turbulentPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+
+    for (int i = 0; i < n; i++) {
+      if (i >= turbulentPeriodStartIndex && i < (turbulentPeriodStartIndex + turbulentPeriodLength)) {
+        series[i] = turbulentPeriodLowerLimit + (turbulentPeriodUpperLimit - turbulentPeriodLowerLimit) * random.nextDouble();
+      } else {
+        series[i] = valueDevLowerLimit + (valueDevHigherLimit - valueDevLowerLimit) * random.nextDouble();
+      }
+    }
+
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
new file mode 100644
index 0000000..d5beb48
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class StepFunctionMetricSeries implements AbstractMetricSeries {
+
+  double startValue = 0.0;
+  double steadyValueDeviationPercentage = 0.0;
+  double steadyPeriodSlope = 0.5;
+  int steadyPeriodMinSize = 10;
+  int steadyPeriodMaxSize = 20;
+  double stepChangePercentage = 0.0;
+  boolean upwardStep = true;
+
+  Random random = new Random();
+
+  // y = mx + c
+  double y;
+  double m;
+  double x;
+  double c;
+  int currentStepSize;
+  int currentIndex;
+
+  public StepFunctionMetricSeries(double startValue,
+                                  double steadyValueDeviationPercentage,
+                                  double steadyPeriodSlope,
+                                  int steadyPeriodMinSize,
+                                  int steadyPeriodMaxSize,
+                                  double stepChangePercentage,
+                                  boolean upwardStep) {
+    this.startValue = startValue;
+    this.steadyValueDeviationPercentage = steadyValueDeviationPercentage;
+    this.steadyPeriodSlope = steadyPeriodSlope;
+    this.steadyPeriodMinSize = steadyPeriodMinSize;
+    this.steadyPeriodMaxSize = steadyPeriodMaxSize;
+    this.stepChangePercentage = stepChangePercentage;
+    this.upwardStep = upwardStep;
+    init();
+  }
+
+  private void init() {
+    y = startValue;
+    m = steadyPeriodSlope;
+    x = 1;
+    c = y - (m * x);
+
+    currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble());
+    currentIndex = 0;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value = 0.0;
+
+    if (currentIndex < currentStepSize) {
+      y = m * x + c;
+      double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y;
+      double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * y;
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+      x++;
+      currentIndex++;
+    }
+
+    if (currentIndex == currentStepSize) {
+      currentIndex = 0;
+      currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble());
+      if (upwardStep) {
+        y = y + stepChangePercentage * y;
+      } else {
+        y = y - stepChangePercentage * y;
+      }
+      x = 1;
+      c = y - (m * x);
+    }
+
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
new file mode 100644
index 0000000..a2b0eea
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class UniformMetricSeries implements AbstractMetricSeries {
+
+  double value = 0.0;
+  double deviationPercentage = 0.0;
+  double outlierProbability = 0.0;
+  double outlierDeviationLowerPercentage = 0.0;
+  double outlierDeviationHigherPercentage = 0.0;
+  boolean outliersAboveValue= true;
+
+  Random random = new Random();
+  double valueDeviationLowerLimit;
+  double valueDeviationHigherLimit;
+  double outlierLeftLowerLimit;
+  double outlierLeftHigherLimit;
+  double outlierRightLowerLimit;
+  double outlierRightUpperLimit;
+  double nonOutlierProbability;
+
+
+  public UniformMetricSeries(double value,
+                             double deviationPercentage,
+                             double outlierProbability,
+                             double outlierDeviationLowerPercentage,
+                             double outlierDeviationHigherPercentage,
+                             boolean outliersAboveValue) {
+    this.value = value;
+    this.deviationPercentage = deviationPercentage;
+    this.outlierProbability = outlierProbability;
+    this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+    this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+    this.outliersAboveValue = outliersAboveValue;
+    init();
+  }
+
+  private void init() {
+    valueDeviationLowerLimit = value - deviationPercentage * value;
+    valueDeviationHigherLimit = value + deviationPercentage * value;
+
+    outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value;
+    outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value;
+    outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value;
+    outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value;
+
+    nonOutlierProbability = 1.0 - outlierProbability;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+    double probability = random.nextDouble();
+
+    if (probability <= nonOutlierProbability) {
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+    } else {
+      if (!outliersAboveValue) {
+        value = outlierLeftLowerLimit + (outlierLeftHigherLimit - outlierLeftLowerLimit) * random.nextDouble();
+      } else {
+        value = outlierRightLowerLimit + (outlierRightUpperLimit - outlierRightLowerLimit) * random.nextDouble();
+      }
+    }
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R
new file mode 100644
index 0000000..0b66095
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  EMA <- w * EMA + (1 - w) * x
+# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 )
+# Alarm = abs(x - EMA) > n * EMS
+
+ema_global <- function(train_data, test_data, w, n) {
+  
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+  
+  anomalies <- data.frame()
+  ema <- 0
+  ems <- 0
+
+  #Train Step
+  for (x in train_data) {
+    ema <- w*ema + (1-w)*x
+    ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+  }
+  
+  for ( i in 1:length(test_data[,1])) {
+    x <- test_data[i,2]
+    if (abs(x - ema) > n*ems) {
+      anomaly <- c(as.numeric(test_data[i,1]), x)
+      # print (anomaly)
+      anomalies <- rbind(anomalies, anomaly)
+    }
+    ema <- w*ema + (1-w)*x
+    ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+  }
+  
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Value")
+  }
+  return (anomalies)
+}
+
+ema_daily <- function(train_data, test_data, w, n) {
+  
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+  
+  anomalies <- data.frame()
+  ema <- vector("numeric", 7)
+  ems <- vector("numeric", 7)
+  
+  #Train Step
+  for ( i in 1:length(train_data[,1])) {
+    x <- train_data[i,2]
+    time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+    index <- time$wday
+    ema[index] <- w*ema[index] + (1-w)*x
+    ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2)
+  }
+  
+  for ( i in 1:length(test_data[,1])) {
+    x <- test_data[i,2]
+    time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+    index <- time$wday
+    
+    if (abs(x - ema[index+1]) > n*ems[index+1]) {
+      anomaly <- c(as.numeric(test_data[i,1]), x)
+      # print (anomaly)
+      anomalies <- rbind(anomalies, anomaly)
+    }
+    ema[index+1] <- w*ema[index+1] + (1-w)*x
+    ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2)
+  }
+  
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Value")
+  }
+  return(anomalies)
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r
new file mode 100644
index 0000000..bca3366
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) {
+
+  #res <- get_data(url)
+  #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+  #names(data) <- c("TS", res$metrics[[1]]$metricname)
+  anomalies <- data.frame()
+
+  granularity <- train_data[2,1] - train_data[1,1]
+  test_start <- test_data[1,1]
+  test_end <- test_data[length(test_data[1,]),1]
+  train_start <- test_start - num_historic_periods*period
+  # round to start of day
+  train_start <- train_start - (train_start %% interval)
+
+  time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+  test_data_day <- time$wday
+
+  h_data <- c()
+  for ( i in length(train_data[,1]):1) {
+    ts <- train_data[i,1]
+    if ( ts < train_start) {
+      break
+    }
+    time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT")
+    if (time$wday == test_data_day) {
+      x <- train_data[i,2]
+      h_data <- c(h_data, x)
+    }
+  }
+
+  if (length(h_data) < 2*length(test_data[,1])) {
+    cat ("\nNot enough training data")
+    return (anomalies)
+  }
+
+  past_median <- median(h_data)
+  past_sd <- sd(h_data)
+  curr_median <- median(test_data[,2])
+
+  if (abs(curr_median - past_median) > n * past_sd) {
+    anomaly <- c(test_start, test_end, curr_median, past_median, past_sd)
+    anomalies <- rbind(anomalies, anomaly)
+  }
+
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD")
+  }
+
+  return (anomalies)
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R
new file mode 100644
index 0000000..8956400
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) {
+  
+  res <- get_data(url)
+  num_metrics <- length(res$metrics)
+  anomalies <- data.frame()
+  
+  metricname <- res$metrics[[1]]$metricname
+  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+  names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+  for (i in 2:num_metrics) {
+    metricname <- res$metrics[[i]]$metricname
+    df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics))
+    names(df) <- c("TS", res$metrics[[i]]$metricname)
+    data <- merge(data, df)
+  }
+  
+  algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)]
+  iForest <- IsolationTrees(algo_data)
+  test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ]
+  
+  if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest)
+  for (i in 1:length(if_res$outF)) {
+    index <- test_start+i-1
+    if (if_res$outF[i] > threshold_score) {
+      anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i])
+      anomalies <- rbind(anomalies, anomaly)
+    } 
+  }
+  
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Anomaly Score", "Path length")
+  }
+  return (anomalies)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r
new file mode 100644
index 0000000..f22bc15
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ams_ks <- function(train_data, test_data, p_value) {
+  
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2]
+  
+  anomalies <- data.frame()
+  res <- ks.test(train_data[,2], test_data[,2])
+  
+  if (res[2] < p_value) {
+    anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2])
+    anomalies <- rbind(anomalies, anomaly)
+  }
+ 
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS Start", "TS End", "D", "p-value")
+  }
+  return (anomalies)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R
new file mode 100644
index 0000000..7650356
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+tukeys_anomalies <- data.frame()
+ema_global_anomalies <- data.frame()
+ema_daily_anomalies <- data.frame()
+ks_anomalies <- data.frame()
+hsdev_anomalies <- data.frame()
+
+init <- function() {
+  tukeys_anomalies <- data.frame()
+  ema_global_anomalies <- data.frame()
+  ema_daily_anomalies <- data.frame()
+  ks_anomalies <- data.frame()
+  hsdev_anomalies <- data.frame()
+}
+
+test_methods <- function(data) {
+
+  init()
+  #res <- get_data(url)
+  #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+  #names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+  limit <- data[length(data[,1]),1]
+  step <- data[2,1] - data[1,1]
+
+  train_start <- data[1,1]
+  train_end <- get_next_day_boundary(train_start, step, limit)
+  test_start <- train_end + step
+  test_end <- get_next_day_boundary(test_start, step, limit)
+  i <- 1
+  day <- 24*60*60*1000
+
+  while (test_start < limit) {
+
+    print (i)
+    i <- i + 1
+    train_data <- data[which(data$TS >= train_start & data$TS <= train_end),]
+    test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+    #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3))
+    #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3))
+    #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3))
+    #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05))
+    hsdev_train_data <- data[which(data$TS < test_start),]
+    hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day))
+
+    train_start <- test_start
+    train_end <- get_next_day_boundary(train_start, step, limit)
+    test_start <- train_end + step
+    test_end <- get_next_day_boundary(test_start, step, limit)
+  }
+  return (hsdev_anomalies)
+}
+
+get_next_day_boundary <- function(start, step, limit) {
+
+  if (start > limit) {
+    return (-1)
+  }
+
+  while (start <= limit) {
+    if (((start %% (24*60*60*1000)) - 28800000) == 0) {
+      return (start)
+    }
+    start <- start + step
+  }
+  return (start)
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r
new file mode 100644
index 0000000..0312226
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+ams_tukeys <- function(train_data, test_data, n) {
+
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+  anomalies <- data.frame()
+  quantiles <- quantile(train_data[,2])
+  iqr <- quantiles[4] - quantiles[2]
+  niqr <- 0
+
+  for ( i in 1:length(test_data[,1])) {
+    x <- test_data[i,2]
+    lb <- quantiles[2] - n*iqr
+    ub <- quantiles[4] + n*iqr
+    if ( (x < lb)  || (x > ub) ) {
+      if (iqr != 0) {
+        if (x < lb) {
+          niqr <- (quantiles[2] - x) / iqr
+        } else {
+          niqr <- (x - quantiles[4]) / iqr
+        }
+      }
+        anomaly <- c(test_data[i,1], x, niqr)
+        anomalies <- rbind(anomalies, anomaly)
+      }
+  }
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Value", "niqr")
+  }
+  return (anomalies)
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties
new file mode 100644
index 0000000..ab106c4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties
@@ -0,0 +1,42 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+appIds=HOST
+
+collectorHost=localhost
+collectorPort=6188
+collectorProtocol=http
+
+zkQuorum=localhost:2181
+
+ambariServerHost=localhost
+clusterName=c1
+
+emaW=0.8
+emaN=3
+tukeysN=3
+pointInTimeTestInterval=300000
+pointInTimeTrainInterval=900000
+
+ksTestInterval=600000
+ksTrainInterval=600000
+hsdevNhp=3
+hsdevInterval=1800000;
+
+skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime
+hosts=avijayan-ad-1.openstacklocal
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
new file mode 100644
index 0000000..324058b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.spark
+
+
+import java.io.{FileInputStream, IOException, InputStream}
+import java.util
+import java.util.Properties
+import java.util.logging.LogManager
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.ambari.metrics.alertservice.prototype.core.MetricsCollectorInterface
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.ambari.metrics.alertservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly}
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique}
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+import org.apache.log4j.Logger
+import org.apache.spark.storage.StorageLevel
+
+object MetricAnomalyDetector {
+
+  /*
+    Load current EMA model
+    Filter step - Check if anomaly
+    Collect / Write to AMS / Print.
+   */
+
+//  var brokers = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181"
+//  var groupId = "ambari-metrics-group"
+//  var topicName = "ambari-metrics-topic"
+//  var numThreads = 1
+//  val anomalyDetectionModels: Array[AnomalyDetectionTechnique] = Array[AnomalyDetectionTechnique]()
+//
+//  def readProperties(propertiesFile: String): Properties = try {
+//    val properties = new Properties
+//    var inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile)
+//    if (inputStream == null) inputStream = new FileInputStream(propertiesFile)
+//    properties.load(inputStream)
+//    properties
+//  } catch {
+//    case ioEx: IOException =>
+//      null
+//  }
+//
+//  def main(args: Array[String]): Unit = {
+//
+//    @transient
+//    lazy val log = org.apache.log4j.LogManager.getLogger("MetricAnomalyDetectorLogger")
+//
+//    if (args.length < 1) {
+//      System.err.println("Usage: MetricSparkConsumer <input-config-file>")
+//      System.exit(1)
+//    }
+//
+//    //Read properties
+//    val properties = readProperties(propertiesFile = args(0))
+//
+//    //Load EMA parameters - w, n
+//    val emaW = properties.getProperty("emaW").toDouble
+//    val emaN = properties.getProperty("emaN").toDouble
+//
+//    //collector info
+//    val collectorHost: String = properties.getProperty("collectorHost")
+//    val collectorPort: String = properties.getProperty("collectorPort")
+//    val collectorProtocol: String = properties.getProperty("collectorProtocol")
+//    val anomalyMetricPublisher = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort)
+//
+//    //Instantiate Kafka stream reader
+//    val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector")
+//    val streamingContext = new StreamingContext(sparkConf, Duration(10000))
+//
+//    val topicsSet = topicName.toSet
+//    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
+////    val stream = KafkaUtils.createDirectStream()
+//
+//    val kafkaStream = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, Map(topicName -> numThreads), StorageLevel.MEMORY_AND_DISK_SER_2)
+//    kafkaStream.print()
+//
+//    var timelineMetricsStream = kafkaStream.map( message => {
+//      val mapper = new ObjectMapper
+//      val metrics = mapper.readValue(message._2, classOf[TimelineMetrics])
+//      metrics
+//    })
+//    timelineMetricsStream.print()
+//
+//    var appMetricStream = timelineMetricsStream.map( timelineMetrics => {
+//      (timelineMetrics.getMetrics.get(0).getAppId, timelineMetrics)
+//    })
+//    appMetricStream.print()
+//
+//    var filteredAppMetricStream = appMetricStream.filter( appMetricTuple => {
+//      appIds.contains(appMetricTuple._1)
+//    } )
+//    filteredAppMetricStream.print()
+//
+//    filteredAppMetricStream.foreachRDD( rdd => {
+//      rdd.foreach( appMetricTuple => {
+//        val timelineMetrics = appMetricTuple._2
+//        logger.info("Received Metric (1): " + timelineMetrics.getMetrics.get(0).getMetricName)
+//        log.info("Received Metric (2): " + timelineMetrics.getMetrics.get(0).getMetricName)
+//        for (timelineMetric <- timelineMetrics.getMetrics) {
+//          var anomalies = emaModel.test(timelineMetric)
+//          anomalyMetricPublisher.publish(anomalies)
+//        }
+//      })
+//    })
+//
+//    streamingContext.start()
+//    streamingContext.awaitTermination()
+//  }
+  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
new file mode 100644
index 0000000..ccded6b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.spark
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+object SparkPhoenixReader {
+
+  def main(args: Array[String]) {
+
+    if (args.length < 6) {
+      System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>")
+      System.exit(1)
+    }
+
+    var metricName = args(0)
+    var appId = args(1)
+    var hostname = args(2)
+    var weight = args(3).toDouble
+    var timessdev = args(4).toInt
+    var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure
+    var modelDir = args(6)
+
+    val conf = new SparkConf()
+    conf.set("spark.app.name", "AMSAnomalyModelBuilder")
+    //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077")
+
+    var sc = new SparkContext(conf)
+    val sqlContext = new SQLContext(sc)
+
+    val currentTime = System.currentTimeMillis()
+    val oneDayBack = currentTime - 24*60*60*1000
+
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString))
+    df.registerTempTable("METRIC_RECORD")
+    val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " +
+      "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack)
+
+    var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double]
+    result.collect().foreach(
+      t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
+    )
+
+    //val seriesName = result.head().getString(0)
+    //val hostname = result.head().getString(1)
+    //val appId = result.head().getString(2)
+
+    val timelineMetric = new TimelineMetric()
+    timelineMetric.setMetricName(metricName)
+    timelineMetric.setAppId(appId)
+    timelineMetric.setHostName(hostname)
+    timelineMetric.setMetricValues(metricValues)
+
+    var emaModel = new EmaTechnique(weight, timessdev)
+    emaModel.test(timelineMetric)
+    emaModel.save(sc, modelDir)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
new file mode 100644
index 0000000..a0b06e6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.TreeMap;
+
+import static org.apache.ambari.metrics.alertservice.prototype.TestRFunctionInvoker.getTS;
+
+public class TestEmaTechnique {
+
+  private static double[] ts;
+  private static String fullFilePath;
+
+  @BeforeClass
+  public static void init() throws URISyntaxException {
+
+    Assume.assumeTrue(System.getenv("R_HOME") != null);
+    ts = getTS(1000);
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+  }
+
+  @Test
+  public void testEmaInitialization() {
+
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+    Assert.assertTrue(ema.getTrackedEmas().isEmpty());
+    Assert.assertTrue(ema.getStartingWeight() == 0.5);
+    Assert.assertTrue(ema.getStartTimesSdev() == 2);
+  }
+
+  @Test
+  public void testEma() {
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+
+    long now = System.currentTimeMillis();
+
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("M1");
+    metric1.setHostName("H1");
+    metric1.setStartTime(now - 1000);
+    metric1.setAppId("A1");
+    metric1.setInstanceId(null);
+    metric1.setType("Integer");
+
+    //Train
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    for (int i = 0; i < 50; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 100, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    List<MetricAnomaly> anomalyList = ema.test(metric1);
+//    Assert.assertTrue(anomalyList.isEmpty());
+
+    metricValues = new TreeMap<Long, Double>();
+    for (int i = 0; i < 50; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 100, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    anomalyList = ema.test(metric1);
+    Assert.assertTrue(!anomalyList.isEmpty());
+    int l1 = anomalyList.size();
+
+    Assert.assertTrue(ema.updateModel(metric1, false, 20));
+    anomalyList = ema.test(metric1);
+    int l2 = anomalyList.size();
+    Assert.assertTrue(l2 < l1);
+
+    Assert.assertTrue(ema.updateModel(metric1, true, 50));
+    anomalyList = ema.test(metric1);
+    int l3 = anomalyList.size();
+    Assert.assertTrue(l3 > l2 && l3 > l1);
+
+  }
+}