You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/09/27 21:04:52 UTC
[2/6] ambari git commit: AMBARI-22077 : Create maven module and
package structure for the anomaly detection engine. (avijayan)
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java
new file mode 100644
index 0000000..a424f8e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.testing.utilities;
+
+import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collections;
+import java.util.Map;
+
+@XmlRootElement
+public class TestSeriesInputRequest {
+
+ private String seriesName;
+ private String seriesType;
+ private Map<String, String> configs;
+
+ public TestSeriesInputRequest() {
+ }
+
+ public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) {
+ this.seriesName = seriesName;
+ this.seriesType = seriesType;
+ this.configs = configs;
+ }
+
+ public String getSeriesName() {
+ return seriesName;
+ }
+
+ public void setSeriesName(String seriesName) {
+ this.seriesName = seriesName;
+ }
+
+ public String getSeriesType() {
+ return seriesType;
+ }
+
+ public void setSeriesType(String seriesType) {
+ this.seriesType = seriesType;
+ }
+
+ public Map<String, String> getConfigs() {
+ return configs;
+ }
+
+ public void setConfigs(Map<String, String> configs) {
+ this.configs = configs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o;
+ return anotherInput.getSeriesName().equals(this.getSeriesName());
+ }
+
+ @Override
+ public int hashCode() {
+ return seriesName.hashCode();
+ }
+
+ public static void main(String[] args) {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value"));
+ try {
+ System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest));
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
new file mode 100644
index 0000000..a8e31bf
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+public interface AbstractMetricSeries {
+
+ public double nextValue();
+ public double[] getSeries(int n);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
new file mode 100644
index 0000000..4158ff4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class DualBandMetricSeries implements AbstractMetricSeries {
+
+ double lowBandValue = 0.0;
+ double lowBandDeviationPercentage = 0.0;
+ int lowBandPeriodSize = 10;
+ double highBandValue = 1.0;
+ double highBandDeviationPercentage = 0.0;
+ int highBandPeriodSize = 10;
+
+ Random random = new Random();
+ double lowBandValueLowerLimit, lowBandValueHigherLimit;
+ double highBandLowerLimit, highBandUpperLimit;
+ int l = 0, h = 0;
+
+ public DualBandMetricSeries(double lowBandValue,
+ double lowBandDeviationPercentage,
+ int lowBandPeriodSize,
+ double highBandValue,
+ double highBandDeviationPercentage,
+ int highBandPeriodSize) {
+ this.lowBandValue = lowBandValue;
+ this.lowBandDeviationPercentage = lowBandDeviationPercentage;
+ this.lowBandPeriodSize = lowBandPeriodSize;
+ this.highBandValue = highBandValue;
+ this.highBandDeviationPercentage = highBandDeviationPercentage;
+ this.highBandPeriodSize = highBandPeriodSize;
+ init();
+ }
+
+ private void init() {
+ lowBandValueLowerLimit = lowBandValue - lowBandDeviationPercentage * lowBandValue;
+ lowBandValueHigherLimit = lowBandValue + lowBandDeviationPercentage * lowBandValue;
+ highBandLowerLimit = highBandValue - highBandDeviationPercentage * highBandValue;
+ highBandUpperLimit = highBandValue + highBandDeviationPercentage * highBandValue;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value = 0.0;
+
+ if (l < lowBandPeriodSize) {
+ value = lowBandValueLowerLimit + (lowBandValueHigherLimit - lowBandValueLowerLimit) * random.nextDouble();
+ l++;
+ } else if (h < highBandPeriodSize) {
+ value = highBandLowerLimit + (highBandUpperLimit - highBandLowerLimit) * random.nextDouble();
+ h++;
+ }
+
+ if (l == lowBandPeriodSize && h == highBandPeriodSize) {
+ l = 0;
+ h = 0;
+ }
+
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
new file mode 100644
index 0000000..1e37ff3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+public class MetricSeriesGeneratorFactory {
+
+ /**
+ * Return a normally distributed data series with some deviation % and outliers.
+ *
+ * @param n size of the data series
+ * @param value The value around which the uniform data series is centered on.
+ * @param deviationPercentage The allowed deviation % on either side of the uniform value. For example, if value = 10, and deviation % is 0.1, the series values lie between 0.9 to 1.1.
+ * @param outlierProbability The probability of finding an outlier in the series.
+ * @param outlierDeviationLowerPercentage min percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13.
+ * @param outlierDeviationHigherPercentage max percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16.
+ * @param outliersAboveValue Outlier should be greater or smaller than the value.
+ * @return uniform series
+ */
+ public static double[] createUniformSeries(int n,
+ double value,
+ double deviationPercentage,
+ double outlierProbability,
+ double outlierDeviationLowerPercentage,
+ double outlierDeviationHigherPercentage,
+ boolean outliersAboveValue) {
+
+ UniformMetricSeries metricSeries = new UniformMetricSeries(value,
+ deviationPercentage,
+ outlierProbability,
+ outlierDeviationLowerPercentage,
+ outlierDeviationHigherPercentage,
+ outliersAboveValue);
+
+ return metricSeries.getSeries(n);
+ }
+
+
+ /**
+ * /**
+ * Returns a normally distributed series.
+ *
+ * @param n size of the data series
+ * @param mean mean of the distribution
+ * @param sd sd of the distribution
+ * @param outlierProbability sd of the distribution
+ * @param outlierDeviationSDTimesLower Lower Limit of the outlier with respect to times sdev from the mean.
+ * @param outlierDeviationSDTimesHigher Higher Limit of the outlier with respect to times sdev from the mean.
+ * @param outlierOnRightEnd Outlier should be on the right end or the left end.
+ * @return normal series
+ */
+ public static double[] createNormalSeries(int n,
+ double mean,
+ double sd,
+ double outlierProbability,
+ double outlierDeviationSDTimesLower,
+ double outlierDeviationSDTimesHigher,
+ boolean outlierOnRightEnd) {
+
+
+ NormalMetricSeries metricSeries = new NormalMetricSeries(mean,
+ sd,
+ outlierProbability,
+ outlierDeviationSDTimesLower,
+ outlierDeviationSDTimesHigher,
+ outlierOnRightEnd);
+
+ return metricSeries.getSeries(n);
+ }
+
+
+ /**
+ * Returns a monotonically increasing / decreasing series
+ *
+ * @param n size of the data series
+ * @param startValue Start value of the monotonic sequence
+ * @param slope direction of monotonicity m > 0 for increasing and m < 0 for decreasing.
+ * @param deviationPercentage The allowed deviation % on either side of the current 'y' value. For example, if current value = 10 according to slope, and deviation % is 0.1, the series values lie between 0.9 to 1.1.
+ * @param outlierProbability The probability of finding an outlier in the series.
+ * @param outlierDeviationLowerPercentage min percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13.
+ * @param outlierDeviationHigherPercentage max percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16.
+ * @param outliersAboveValue Outlier should be greater or smaller than the 'y' value.
+ * @return
+ */
+ public static double[] createMonotonicSeries(int n,
+ double startValue,
+ double slope,
+ double deviationPercentage,
+ double outlierProbability,
+ double outlierDeviationLowerPercentage,
+ double outlierDeviationHigherPercentage,
+ boolean outliersAboveValue) {
+
+ MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(startValue,
+ slope,
+ deviationPercentage,
+ outlierProbability,
+ outlierDeviationLowerPercentage,
+ outlierDeviationHigherPercentage,
+ outliersAboveValue);
+
+ return metricSeries.getSeries(n);
+ }
+
+
+ /**
+ * Returns a dual band series (lower and higher)
+ *
+ * @param n size of the data series
+ * @param lowBandValue lower band value
+ * @param lowBandDeviationPercentage lower band deviation
+ * @param lowBandPeriodSize lower band
+ * @param highBandValue high band centre value
+ * @param highBandDeviationPercentage high band deviation.
+ * @param highBandPeriodSize high band size
+ * @return
+ */
+ public static double[] getDualBandSeries(int n,
+ double lowBandValue,
+ double lowBandDeviationPercentage,
+ int lowBandPeriodSize,
+ double highBandValue,
+ double highBandDeviationPercentage,
+ int highBandPeriodSize) {
+
+ DualBandMetricSeries metricSeries = new DualBandMetricSeries(lowBandValue,
+ lowBandDeviationPercentage,
+ lowBandPeriodSize,
+ highBandValue,
+ highBandDeviationPercentage,
+ highBandPeriodSize);
+
+ return metricSeries.getSeries(n);
+ }
+
+ /**
+ * Returns a step function series.
+ *
+ * @param n size of the data series
+ * @param startValue start steady value
+ * @param steadyValueDeviationPercentage required devation in the steady state value
+ * @param steadyPeriodSlope direction of monotonicity m > 0 for increasing and m < 0 for decreasing, m = 0 no increase or decrease.
+ * @param steadyPeriodMinSize min size for step period
+ * @param steadyPeriodMaxSize max size for step period.
+ * @param stepChangePercentage Increase / decrease in steady state to denote a step in terms of deviation percentage from the last value.
+ * @param upwardStep upward or downward step.
+ * @return
+ */
+ public static double[] getStepFunctionSeries(int n,
+ double startValue,
+ double steadyValueDeviationPercentage,
+ double steadyPeriodSlope,
+ int steadyPeriodMinSize,
+ int steadyPeriodMaxSize,
+ double stepChangePercentage,
+ boolean upwardStep) {
+
+ StepFunctionMetricSeries metricSeries = new StepFunctionMetricSeries(startValue,
+ steadyValueDeviationPercentage,
+ steadyPeriodSlope,
+ steadyPeriodMinSize,
+ steadyPeriodMaxSize,
+ stepChangePercentage,
+ upwardStep);
+
+ return metricSeries.getSeries(n);
+ }
+
+ /**
+ * Series with small period of turbulence and then back to steady.
+ *
+ * @param n size of the data series
+ * @param steadyStateValue steady state center value
+ * @param steadyStateDeviationPercentage steady state deviation in percentage
+ * @param turbulentPeriodDeviationLowerPercentage turbulent state lower limit in terms of percentage from centre value.
+ * @param turbulentPeriodDeviationHigherPercentage turbulent state higher limit in terms of percentage from centre value.
+ * @param turbulentPeriodLength turbulent period length (number of points)
+ * @param turbulentStatePosition Where the turbulent state should be 0 - at the beginning, 1 - in the middle (25% - 50% of the series), 2 - at the end of the series.
+ * @return
+ */
+ public static double[] getSteadySeriesWithTurbulentPeriod(int n,
+ double steadyStateValue,
+ double steadyStateDeviationPercentage,
+ double turbulentPeriodDeviationLowerPercentage,
+ double turbulentPeriodDeviationHigherPercentage,
+ int turbulentPeriodLength,
+ int turbulentStatePosition
+ ) {
+
+
+ SteadyWithTurbulenceMetricSeries metricSeries = new SteadyWithTurbulenceMetricSeries(n,
+ steadyStateValue,
+ steadyStateDeviationPercentage,
+ turbulentPeriodDeviationLowerPercentage,
+ turbulentPeriodDeviationHigherPercentage,
+ turbulentPeriodLength,
+ turbulentStatePosition);
+
+ return metricSeries.getSeries(n);
+ }
+
+
+ public static double[] generateSeries(String type, int n, Map<String, String> configs) {
+
+ double[] series;
+ switch (type) {
+
+ case "normal":
+ series = createNormalSeries(n,
+ Double.parseDouble(configs.getOrDefault("mean", "0")),
+ Double.parseDouble(configs.getOrDefault("sd", "1")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true")));
+ break;
+
+ case "uniform":
+ series = createUniformSeries(n,
+ Double.parseDouble(configs.getOrDefault("value", "10")),
+ Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+ break;
+
+ case "monotonic":
+ series = createMonotonicSeries(n,
+ Double.parseDouble(configs.getOrDefault("startValue", "10")),
+ Double.parseDouble(configs.getOrDefault("slope", "0")),
+ Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+ break;
+
+ case "dualband":
+ series = getDualBandSeries(n,
+ Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+ Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+ Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+ Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+ Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+ Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+ break;
+
+ case "step":
+ series = getStepFunctionSeries(n,
+ Double.parseDouble(configs.getOrDefault("startValue", "10")),
+ Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+ Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+ Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+ Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+ break;
+
+ case "turbulence":
+ series = getSteadySeriesWithTurbulentPeriod(n,
+ Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+ Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")),
+ Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+ Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0")));
+ break;
+
+ default:
+ series = createNormalSeries(n,
+ 0,
+ 1,
+ 0,
+ 0,
+ 0,
+ true);
+ }
+ return series;
+ }
+
+ public static AbstractMetricSeries generateSeries(String type, Map<String, String> configs) {
+
+ AbstractMetricSeries series;
+ switch (type) {
+
+ case "normal":
+ series = new NormalMetricSeries(Double.parseDouble(configs.getOrDefault("mean", "0")),
+ Double.parseDouble(configs.getOrDefault("sd", "1")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true")));
+ break;
+
+ case "uniform":
+ series = new UniformMetricSeries(
+ Double.parseDouble(configs.getOrDefault("value", "10")),
+ Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+ break;
+
+ case "monotonic":
+ series = new MonotonicMetricSeries(
+ Double.parseDouble(configs.getOrDefault("startValue", "10")),
+ Double.parseDouble(configs.getOrDefault("slope", "0")),
+ Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true")));
+ break;
+
+ case "dualband":
+ series = new DualBandMetricSeries(
+ Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+ Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+ Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+ Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+ Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+ Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+ break;
+
+ case "step":
+ series = new StepFunctionMetricSeries(
+ Double.parseDouble(configs.getOrDefault("startValue", "10")),
+ Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+ Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+ Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+ Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")),
+ Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+ break;
+
+ case "turbulence":
+ series = new SteadyWithTurbulenceMetricSeries(
+ Integer.parseInt(configs.getOrDefault("approxSeriesLength", "100")),
+ Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+ Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")),
+ Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")),
+ Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+ Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0")));
+ break;
+
+ default:
+ series = new NormalMetricSeries(0,
+ 1,
+ 0,
+ 0,
+ 0,
+ true);
+ }
+ return series;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
new file mode 100644
index 0000000..a883d08
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class MonotonicMetricSeries implements AbstractMetricSeries {
+
+ double startValue = 0.0;
+ double slope = 0.5;
+ double deviationPercentage = 0.0;
+ double outlierProbability = 0.0;
+ double outlierDeviationLowerPercentage = 0.0;
+ double outlierDeviationHigherPercentage = 0.0;
+ boolean outliersAboveValue = true;
+
+ Random random = new Random();
+ double nonOutlierProbability;
+
+ // y = mx + c
+ double y;
+ double m;
+ double x;
+ double c;
+
+ public MonotonicMetricSeries(double startValue,
+ double slope,
+ double deviationPercentage,
+ double outlierProbability,
+ double outlierDeviationLowerPercentage,
+ double outlierDeviationHigherPercentage,
+ boolean outliersAboveValue) {
+ this.startValue = startValue;
+ this.slope = slope;
+ this.deviationPercentage = deviationPercentage;
+ this.outlierProbability = outlierProbability;
+ this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+ this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+ this.outliersAboveValue = outliersAboveValue;
+ init();
+ }
+
+ private void init() {
+ y = startValue;
+ m = slope;
+ x = 1;
+ c = y - (m * x);
+ nonOutlierProbability = 1.0 - outlierProbability;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value;
+ double probability = random.nextDouble();
+
+ y = m * x + c;
+ if (probability <= nonOutlierProbability) {
+ double valueDeviationLowerLimit = y - deviationPercentage * y;
+ double valueDeviationHigherLimit = y + deviationPercentage * y;
+ value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+ } else {
+ if (outliersAboveValue) {
+ double outlierLowerLimit = y + outlierDeviationLowerPercentage * y;
+ double outlierUpperLimit = y + outlierDeviationHigherPercentage * y;
+ value = outlierLowerLimit + (outlierUpperLimit - outlierLowerLimit) * random.nextDouble();
+ } else {
+ double outlierLowerLimit = y - outlierDeviationLowerPercentage * y;
+ double outlierUpperLimit = y - outlierDeviationHigherPercentage * y;
+ value = outlierUpperLimit + (outlierLowerLimit - outlierUpperLimit) * random.nextDouble();
+ }
+ }
+ x++;
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
new file mode 100644
index 0000000..cc83d2c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class NormalMetricSeries implements AbstractMetricSeries {
+
+ double mean = 0.0;
+ double sd = 1.0;
+ double outlierProbability = 0.0;
+ double outlierDeviationSDTimesLower = 0.0;
+ double outlierDeviationSDTimesHigher = 0.0;
+ boolean outlierOnRightEnd = true;
+
+ Random random = new Random();
+ double nonOutlierProbability;
+
+
+ public NormalMetricSeries(double mean,
+ double sd,
+ double outlierProbability,
+ double outlierDeviationSDTimesLower,
+ double outlierDeviationSDTimesHigher,
+ boolean outlierOnRightEnd) {
+ this.mean = mean;
+ this.sd = sd;
+ this.outlierProbability = outlierProbability;
+ this.outlierDeviationSDTimesLower = outlierDeviationSDTimesLower;
+ this.outlierDeviationSDTimesHigher = outlierDeviationSDTimesHigher;
+ this.outlierOnRightEnd = outlierOnRightEnd;
+ init();
+ }
+
+ private void init() {
+ nonOutlierProbability = 1.0 - outlierProbability;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value;
+ double probability = random.nextDouble();
+
+ if (probability <= nonOutlierProbability) {
+ value = random.nextGaussian() * sd + mean;
+ } else {
+ if (outlierOnRightEnd) {
+ value = mean + (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd;
+ } else {
+ value = mean - (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd;
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
new file mode 100644
index 0000000..c4ed3ba
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class SteadyWithTurbulenceMetricSeries implements AbstractMetricSeries {
+
+ double steadyStateValue = 0.0;
+ double steadyStateDeviationPercentage = 0.0;
+ double turbulentPeriodDeviationLowerPercentage = 0.3;
+ double turbulentPeriodDeviationHigherPercentage = 0.5;
+ int turbulentPeriodLength = 5;
+ int turbulentStatePosition = 1;
+ int approximateSeriesLength = 10;
+
+ Random random = new Random();
+ double valueDeviationLowerLimit;
+ double valueDeviationHigherLimit;
+ double tPeriodLowerLimit;
+ double tPeriodUpperLimit;
+ int tPeriodStartIndex = 0;
+ int index = 0;
+
+ public SteadyWithTurbulenceMetricSeries(int approximateSeriesLength,
+ double steadyStateValue,
+ double steadyStateDeviationPercentage,
+ double turbulentPeriodDeviationLowerPercentage,
+ double turbulentPeriodDeviationHigherPercentage,
+ int turbulentPeriodLength,
+ int turbulentStatePosition) {
+ this.approximateSeriesLength = approximateSeriesLength;
+ this.steadyStateValue = steadyStateValue;
+ this.steadyStateDeviationPercentage = steadyStateDeviationPercentage;
+ this.turbulentPeriodDeviationLowerPercentage = turbulentPeriodDeviationLowerPercentage;
+ this.turbulentPeriodDeviationHigherPercentage = turbulentPeriodDeviationHigherPercentage;
+ this.turbulentPeriodLength = turbulentPeriodLength;
+ this.turbulentStatePosition = turbulentStatePosition;
+ init();
+ }
+
+ private void init() {
+
+ if (turbulentStatePosition == 1) {
+ tPeriodStartIndex = (int) (0.25 * approximateSeriesLength + (0.25 * approximateSeriesLength * random.nextDouble()));
+ } else if (turbulentStatePosition == 2) {
+ tPeriodStartIndex = approximateSeriesLength - turbulentPeriodLength;
+ }
+
+ valueDeviationLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue;
+ valueDeviationHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue;
+
+ tPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+ tPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value;
+
+ if (index >= tPeriodStartIndex && index <= (tPeriodStartIndex + turbulentPeriodLength)) {
+ value = tPeriodLowerLimit + (tPeriodUpperLimit - tPeriodLowerLimit) * random.nextDouble();
+ } else {
+ value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+ }
+ index++;
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+
+ double[] series = new double[n];
+ int turbulentPeriodStartIndex = 0;
+
+ if (turbulentStatePosition == 1) {
+ turbulentPeriodStartIndex = (int) (0.25 * n + (0.25 * n * random.nextDouble()));
+ } else if (turbulentStatePosition == 2) {
+ turbulentPeriodStartIndex = n - turbulentPeriodLength;
+ }
+
+ double valueDevLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue;
+ double valueDevHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue;
+
+ double turbulentPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+ double turbulentPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+
+ for (int i = 0; i < n; i++) {
+ if (i >= turbulentPeriodStartIndex && i < (turbulentPeriodStartIndex + turbulentPeriodLength)) {
+ series[i] = turbulentPeriodLowerLimit + (turbulentPeriodUpperLimit - turbulentPeriodLowerLimit) * random.nextDouble();
+ } else {
+ series[i] = valueDevLowerLimit + (valueDevHigherLimit - valueDevLowerLimit) * random.nextDouble();
+ }
+ }
+
+ return series;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
new file mode 100644
index 0000000..d5beb48
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class StepFunctionMetricSeries implements AbstractMetricSeries {
+
+ double startValue = 0.0;
+ double steadyValueDeviationPercentage = 0.0;
+ double steadyPeriodSlope = 0.5;
+ int steadyPeriodMinSize = 10;
+ int steadyPeriodMaxSize = 20;
+ double stepChangePercentage = 0.0;
+ boolean upwardStep = true;
+
+ Random random = new Random();
+
+ // y = mx + c
+ double y;
+ double m;
+ double x;
+ double c;
+ int currentStepSize;
+ int currentIndex;
+
+ public StepFunctionMetricSeries(double startValue,
+ double steadyValueDeviationPercentage,
+ double steadyPeriodSlope,
+ int steadyPeriodMinSize,
+ int steadyPeriodMaxSize,
+ double stepChangePercentage,
+ boolean upwardStep) {
+ this.startValue = startValue;
+ this.steadyValueDeviationPercentage = steadyValueDeviationPercentage;
+ this.steadyPeriodSlope = steadyPeriodSlope;
+ this.steadyPeriodMinSize = steadyPeriodMinSize;
+ this.steadyPeriodMaxSize = steadyPeriodMaxSize;
+ this.stepChangePercentage = stepChangePercentage;
+ this.upwardStep = upwardStep;
+ init();
+ }
+
+ private void init() {
+ y = startValue;
+ m = steadyPeriodSlope;
+ x = 1;
+ c = y - (m * x);
+
+ currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble());
+ currentIndex = 0;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value = 0.0;
+
+ if (currentIndex < currentStepSize) {
+ y = m * x + c;
+ double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y;
+ double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * y;
+ value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+ x++;
+ currentIndex++;
+ }
+
+ if (currentIndex == currentStepSize) {
+ currentIndex = 0;
+ currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble());
+ if (upwardStep) {
+ y = y + stepChangePercentage * y;
+ } else {
+ y = y - stepChangePercentage * y;
+ }
+ x = 1;
+ c = y - (m * x);
+ }
+
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
new file mode 100644
index 0000000..a2b0eea
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class UniformMetricSeries implements AbstractMetricSeries {
+
+ double value = 0.0;
+ double deviationPercentage = 0.0;
+ double outlierProbability = 0.0;
+ double outlierDeviationLowerPercentage = 0.0;
+ double outlierDeviationHigherPercentage = 0.0;
+ boolean outliersAboveValue= true;
+
+ Random random = new Random();
+ double valueDeviationLowerLimit;
+ double valueDeviationHigherLimit;
+ double outlierLeftLowerLimit;
+ double outlierLeftHigherLimit;
+ double outlierRightLowerLimit;
+ double outlierRightUpperLimit;
+ double nonOutlierProbability;
+
+
+ public UniformMetricSeries(double value,
+ double deviationPercentage,
+ double outlierProbability,
+ double outlierDeviationLowerPercentage,
+ double outlierDeviationHigherPercentage,
+ boolean outliersAboveValue) {
+ this.value = value;
+ this.deviationPercentage = deviationPercentage;
+ this.outlierProbability = outlierProbability;
+ this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+ this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+ this.outliersAboveValue = outliersAboveValue;
+ init();
+ }
+
+ private void init() {
+ valueDeviationLowerLimit = value - deviationPercentage * value;
+ valueDeviationHigherLimit = value + deviationPercentage * value;
+
+ outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value;
+ outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value;
+ outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value;
+ outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value;
+
+ nonOutlierProbability = 1.0 - outlierProbability;
+ }
+
+ @Override
+ public double nextValue() {
+
+ double value;
+ double probability = random.nextDouble();
+
+ if (probability <= nonOutlierProbability) {
+ value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble();
+ } else {
+ if (!outliersAboveValue) {
+ value = outlierLeftLowerLimit + (outlierLeftHigherLimit - outlierLeftLowerLimit) * random.nextDouble();
+ } else {
+ value = outlierRightLowerLimit + (outlierRightUpperLimit - outlierRightLowerLimit) * random.nextDouble();
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public double[] getSeries(int n) {
+ double[] series = new double[n];
+ for (int i = 0; i < n; i++) {
+ series[i] = nextValue();
+ }
+ return series;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R
new file mode 100644
index 0000000..0b66095
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# EMA <- w * EMA + (1 - w) * x
+# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 )
+# Alarm = abs(x - EMA) > n * EMS
+
+ema_global <- function(train_data, test_data, w, n) {
+
+# res <- get_data(url)
+# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+# names(data) <- c("TS", res$metrics[[1]]$metricname)
+# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+ anomalies <- data.frame()
+ ema <- 0
+ ems <- 0
+
+ #Train Step
+ for (x in train_data) {
+ ema <- w*ema + (1-w)*x
+ ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+ }
+
+ for ( i in 1:length(test_data[,1])) {
+ x <- test_data[i,2]
+ if (abs(x - ema) > n*ems) {
+ anomaly <- c(as.numeric(test_data[i,1]), x)
+ # print (anomaly)
+ anomalies <- rbind(anomalies, anomaly)
+ }
+ ema <- w*ema + (1-w)*x
+ ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS", "Value")
+ }
+ return (anomalies)
+}
+
+ema_daily <- function(train_data, test_data, w, n) {
+
+# res <- get_data(url)
+# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+# names(data) <- c("TS", res$metrics[[1]]$metricname)
+# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ]
+# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+ anomalies <- data.frame()
+ ema <- vector("numeric", 7)
+ ems <- vector("numeric", 7)
+
+ #Train Step
+ for ( i in 1:length(train_data[,1])) {
+ x <- train_data[i,2]
+ time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+ index <- time$wday
+ ema[index] <- w*ema[index] + (1-w)*x
+ ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2)
+ }
+
+ for ( i in 1:length(test_data[,1])) {
+ x <- test_data[i,2]
+ time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+ index <- time$wday
+
+ if (abs(x - ema[index+1]) > n*ems[index+1]) {
+ anomaly <- c(as.numeric(test_data[i,1]), x)
+ # print (anomaly)
+ anomalies <- rbind(anomalies, anomaly)
+ }
+ ema[index+1] <- w*ema[index+1] + (1-w)*x
+ ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2)
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS", "Value")
+ }
+ return(anomalies)
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r
new file mode 100644
index 0000000..bca3366
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) {
+
+ #res <- get_data(url)
+ #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+ #names(data) <- c("TS", res$metrics[[1]]$metricname)
+ anomalies <- data.frame()
+
+ granularity <- train_data[2,1] - train_data[1,1]
+ test_start <- test_data[1,1]
+ test_end <- test_data[length(test_data[1,]),1]
+ train_start <- test_start - num_historic_periods*period
+ # round to start of day
+ train_start <- train_start - (train_start %% interval)
+
+ time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+ test_data_day <- time$wday
+
+ h_data <- c()
+ for ( i in length(train_data[,1]):1) {
+ ts <- train_data[i,1]
+ if ( ts < train_start) {
+ break
+ }
+ time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT")
+ if (time$wday == test_data_day) {
+ x <- train_data[i,2]
+ h_data <- c(h_data, x)
+ }
+ }
+
+ if (length(h_data) < 2*length(test_data[,1])) {
+ cat ("\nNot enough training data")
+ return (anomalies)
+ }
+
+ past_median <- median(h_data)
+ past_sd <- sd(h_data)
+ curr_median <- median(test_data[,2])
+
+ if (abs(curr_median - past_median) > n * past_sd) {
+ anomaly <- c(test_start, test_end, curr_median, past_median, past_sd)
+ anomalies <- rbind(anomalies, anomaly)
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD")
+ }
+
+ return (anomalies)
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R
new file mode 100644
index 0000000..8956400
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) {
+
+ res <- get_data(url)
+ num_metrics <- length(res$metrics)
+ anomalies <- data.frame()
+
+ metricname <- res$metrics[[1]]$metricname
+ data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+ names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+ for (i in 2:num_metrics) {
+ metricname <- res$metrics[[i]]$metricname
+ df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics))
+ names(df) <- c("TS", res$metrics[[i]]$metricname)
+ data <- merge(data, df)
+ }
+
+ algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)]
+ iForest <- IsolationTrees(algo_data)
+ test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ]
+
+ if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest)
+ for (i in 1:length(if_res$outF)) {
+ index <- test_start+i-1
+ if (if_res$outF[i] > threshold_score) {
+ anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i])
+ anomalies <- rbind(anomalies, anomaly)
+ }
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS", "Anomaly Score", "Path length")
+ }
+ return (anomalies)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r
new file mode 100644
index 0000000..f22bc15
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ams_ks <- function(train_data, test_data, p_value) {
+
+# res <- get_data(url)
+# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+# names(data) <- c("TS", res$metrics[[1]]$metricname)
+# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2]
+
+ anomalies <- data.frame()
+ res <- ks.test(train_data[,2], test_data[,2])
+
+ if (res[2] < p_value) {
+ anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2])
+ anomalies <- rbind(anomalies, anomaly)
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS Start", "TS End", "D", "p-value")
+ }
+ return (anomalies)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R
new file mode 100644
index 0000000..7650356
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+tukeys_anomalies <- data.frame()
+ema_global_anomalies <- data.frame()
+ema_daily_anomalies <- data.frame()
+ks_anomalies <- data.frame()
+hsdev_anomalies <- data.frame()
+
+init <- function() {
+ tukeys_anomalies <- data.frame()
+ ema_global_anomalies <- data.frame()
+ ema_daily_anomalies <- data.frame()
+ ks_anomalies <- data.frame()
+ hsdev_anomalies <- data.frame()
+}
+
+test_methods <- function(data) {
+
+ init()
+ #res <- get_data(url)
+ #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+ #names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+ limit <- data[length(data[,1]),1]
+ step <- data[2,1] - data[1,1]
+
+ train_start <- data[1,1]
+ train_end <- get_next_day_boundary(train_start, step, limit)
+ test_start <- train_end + step
+ test_end <- get_next_day_boundary(test_start, step, limit)
+ i <- 1
+ day <- 24*60*60*1000
+
+ while (test_start < limit) {
+
+ print (i)
+ i <- i + 1
+ train_data <- data[which(data$TS >= train_start & data$TS <= train_end),]
+ test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+ #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3))
+ #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3))
+ #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3))
+ #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05))
+ hsdev_train_data <- data[which(data$TS < test_start),]
+ hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day))
+
+ train_start <- test_start
+ train_end <- get_next_day_boundary(train_start, step, limit)
+ test_start <- train_end + step
+ test_end <- get_next_day_boundary(test_start, step, limit)
+ }
+ return (hsdev_anomalies)
+}
+
+get_next_day_boundary <- function(start, step, limit) {
+
+ if (start > limit) {
+ return (-1)
+ }
+
+ while (start <= limit) {
+ if (((start %% (24*60*60*1000)) - 28800000) == 0) {
+ return (start)
+ }
+ start <- start + step
+ }
+ return (start)
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r
new file mode 100644
index 0000000..0312226
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+ams_tukeys <- function(train_data, test_data, n) {
+
+# res <- get_data(url)
+# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+# names(data) <- c("TS", res$metrics[[1]]$metricname)
+# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+ anomalies <- data.frame()
+ quantiles <- quantile(train_data[,2])
+ iqr <- quantiles[4] - quantiles[2]
+ niqr <- 0
+
+ for ( i in 1:length(test_data[,1])) {
+ x <- test_data[i,2]
+ lb <- quantiles[2] - n*iqr
+ ub <- quantiles[4] + n*iqr
+ if ( (x < lb) || (x > ub) ) {
+ if (iqr != 0) {
+ if (x < lb) {
+ niqr <- (quantiles[2] - x) / iqr
+ } else {
+ niqr <- (x - quantiles[4]) / iqr
+ }
+ }
+ anomaly <- c(test_data[i,1], x, niqr)
+ anomalies <- rbind(anomalies, anomaly)
+ }
+ }
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS", "Value", "niqr")
+ }
+ return (anomalies)
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties
new file mode 100644
index 0000000..ab106c4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties
@@ -0,0 +1,42 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+appIds=HOST
+
+collectorHost=localhost
+collectorPort=6188
+collectorProtocol=http
+
+zkQuorum=localhost:2181
+
+ambariServerHost=localhost
+clusterName=c1
+
+emaW=0.8
+emaN=3
+tukeysN=3
+pointInTimeTestInterval=300000
+pointInTimeTrainInterval=900000
+
+ksTestInterval=600000
+ksTrainInterval=600000
+hsdevNhp=3
+hsdevInterval=1800000;
+
+skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime
+hosts=avijayan-ad-1.openstacklocal
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
new file mode 100644
index 0000000..324058b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.spark
+
+
+import java.io.{FileInputStream, IOException, InputStream}
+import java.util
+import java.util.Properties
+import java.util.logging.LogManager
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.ambari.metrics.alertservice.prototype.core.MetricsCollectorInterface
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.ambari.metrics.alertservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly}
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique}
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+import org.apache.log4j.Logger
+import org.apache.spark.storage.StorageLevel
+
+object MetricAnomalyDetector {
+
+ /*
+ Load current EMA model
+ Filter step - Check if anomaly
+ Collect / Write to AMS / Print.
+ */
+
+// var brokers = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181"
+// var groupId = "ambari-metrics-group"
+// var topicName = "ambari-metrics-topic"
+// var numThreads = 1
+// val anomalyDetectionModels: Array[AnomalyDetectionTechnique] = Array[AnomalyDetectionTechnique]()
+//
+// def readProperties(propertiesFile: String): Properties = try {
+// val properties = new Properties
+// var inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile)
+// if (inputStream == null) inputStream = new FileInputStream(propertiesFile)
+// properties.load(inputStream)
+// properties
+// } catch {
+// case ioEx: IOException =>
+// null
+// }
+//
+// def main(args: Array[String]): Unit = {
+//
+// @transient
+// lazy val log = org.apache.log4j.LogManager.getLogger("MetricAnomalyDetectorLogger")
+//
+// if (args.length < 1) {
+// System.err.println("Usage: MetricSparkConsumer <input-config-file>")
+// System.exit(1)
+// }
+//
+// //Read properties
+// val properties = readProperties(propertiesFile = args(0))
+//
+// //Load EMA parameters - w, n
+// val emaW = properties.getProperty("emaW").toDouble
+// val emaN = properties.getProperty("emaN").toDouble
+//
+// //collector info
+// val collectorHost: String = properties.getProperty("collectorHost")
+// val collectorPort: String = properties.getProperty("collectorPort")
+// val collectorProtocol: String = properties.getProperty("collectorProtocol")
+// val anomalyMetricPublisher = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort)
+//
+// //Instantiate Kafka stream reader
+// val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector")
+// val streamingContext = new StreamingContext(sparkConf, Duration(10000))
+//
+// val topicsSet = topicName.toSet
+// val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
+//// val stream = KafkaUtils.createDirectStream()
+//
+// val kafkaStream = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, Map(topicName -> numThreads), StorageLevel.MEMORY_AND_DISK_SER_2)
+// kafkaStream.print()
+//
+// var timelineMetricsStream = kafkaStream.map( message => {
+// val mapper = new ObjectMapper
+// val metrics = mapper.readValue(message._2, classOf[TimelineMetrics])
+// metrics
+// })
+// timelineMetricsStream.print()
+//
+// var appMetricStream = timelineMetricsStream.map( timelineMetrics => {
+// (timelineMetrics.getMetrics.get(0).getAppId, timelineMetrics)
+// })
+// appMetricStream.print()
+//
+// var filteredAppMetricStream = appMetricStream.filter( appMetricTuple => {
+// appIds.contains(appMetricTuple._1)
+// } )
+// filteredAppMetricStream.print()
+//
+// filteredAppMetricStream.foreachRDD( rdd => {
+// rdd.foreach( appMetricTuple => {
+// val timelineMetrics = appMetricTuple._2
+// logger.info("Received Metric (1): " + timelineMetrics.getMetrics.get(0).getMetricName)
+// log.info("Received Metric (2): " + timelineMetrics.getMetrics.get(0).getMetricName)
+// for (timelineMetric <- timelineMetrics.getMetrics) {
+// var anomalies = emaModel.test(timelineMetric)
+// anomalyMetricPublisher.publish(anomalies)
+// }
+// })
+// })
+//
+// streamingContext.start()
+// streamingContext.awaitTermination()
+// }
+ }
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
new file mode 100644
index 0000000..ccded6b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.spark
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+object SparkPhoenixReader {
+
+ def main(args: Array[String]) {
+
+ if (args.length < 6) {
+ System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>")
+ System.exit(1)
+ }
+
+ var metricName = args(0)
+ var appId = args(1)
+ var hostname = args(2)
+ var weight = args(3).toDouble
+ var timessdev = args(4).toInt
+ var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure
+ var modelDir = args(6)
+
+ val conf = new SparkConf()
+ conf.set("spark.app.name", "AMSAnomalyModelBuilder")
+ //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077")
+
+ var sc = new SparkContext(conf)
+ val sqlContext = new SQLContext(sc)
+
+ val currentTime = System.currentTimeMillis()
+ val oneDayBack = currentTime - 24*60*60*1000
+
+ val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString))
+ df.registerTempTable("METRIC_RECORD")
+ val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " +
+ "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack)
+
+ var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double]
+ result.collect().foreach(
+ t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
+ )
+
+ //val seriesName = result.head().getString(0)
+ //val hostname = result.head().getString(1)
+ //val appId = result.head().getString(2)
+
+ val timelineMetric = new TimelineMetric()
+ timelineMetric.setMetricName(metricName)
+ timelineMetric.setAppId(appId)
+ timelineMetric.setHostName(hostname)
+ timelineMetric.setMetricValues(metricValues)
+
+ var emaModel = new EmaTechnique(weight, timessdev)
+ emaModel.test(timelineMetric)
+ emaModel.save(sc, modelDir)
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
new file mode 100644
index 0000000..a0b06e6
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+import java.util.TreeMap;
+
+import static org.apache.ambari.metrics.alertservice.prototype.TestRFunctionInvoker.getTS;
+
+public class TestEmaTechnique {
+
+ private static double[] ts;
+ private static String fullFilePath;
+
+ @BeforeClass
+ public static void init() throws URISyntaxException {
+
+ Assume.assumeTrue(System.getenv("R_HOME") != null);
+ ts = getTS(1000);
+ URL url = ClassLoader.getSystemResource("R-scripts");
+ fullFilePath = new File(url.toURI()).getAbsolutePath();
+ RFunctionInvoker.setScriptsDir(fullFilePath);
+ }
+
+ @Test
+ public void testEmaInitialization() {
+
+ EmaTechnique ema = new EmaTechnique(0.5, 3);
+ Assert.assertTrue(ema.getTrackedEmas().isEmpty());
+ Assert.assertTrue(ema.getStartingWeight() == 0.5);
+ Assert.assertTrue(ema.getStartTimesSdev() == 2);
+ }
+
+ @Test
+ public void testEma() {
+ EmaTechnique ema = new EmaTechnique(0.5, 3);
+
+ long now = System.currentTimeMillis();
+
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setMetricName("M1");
+ metric1.setHostName("H1");
+ metric1.setStartTime(now - 1000);
+ metric1.setAppId("A1");
+ metric1.setInstanceId(null);
+ metric1.setType("Integer");
+
+ //Train
+ TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+ for (int i = 0; i < 50; i++) {
+ double metric = 20000 + Math.random();
+ metricValues.put(now - i * 100, metric);
+ }
+ metric1.setMetricValues(metricValues);
+ List<MetricAnomaly> anomalyList = ema.test(metric1);
+// Assert.assertTrue(anomalyList.isEmpty());
+
+ metricValues = new TreeMap<Long, Double>();
+ for (int i = 0; i < 50; i++) {
+ double metric = 20000 + Math.random();
+ metricValues.put(now - i * 100, metric);
+ }
+ metric1.setMetricValues(metricValues);
+ anomalyList = ema.test(metric1);
+ Assert.assertTrue(!anomalyList.isEmpty());
+ int l1 = anomalyList.size();
+
+ Assert.assertTrue(ema.updateModel(metric1, false, 20));
+ anomalyList = ema.test(metric1);
+ int l2 = anomalyList.size();
+ Assert.assertTrue(l2 < l1);
+
+ Assert.assertTrue(ema.updateModel(metric1, true, 50));
+ anomalyList = ema.test(metric1);
+ int l3 = anomalyList.size();
+ Assert.assertTrue(l3 > l2 && l3 > l1);
+
+ }
+}