You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/09/21 22:41:34 UTC
[3/4] ambari git commit: AMBARI-21686 : Implement a test driver that
provides a set of metric series with different kinds of metric behavior.
(avijayan)
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
new file mode 100644
index 0000000..7735d6c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import scala.Tuple2;
+
+import java.util.*;
+
+public class MetricSparkConsumer {
+
+ private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class);
+ private static String groupId = "ambari-metrics-group";
+ private static String topicName = "ambari-metrics-topic";
+ private static int numThreads = 1;
+ private static long pitStartTime = System.currentTimeMillis();
+ private static long ksStartTime = pitStartTime;
+ private static long hdevStartTime = ksStartTime;
+
+ public MetricSparkConsumer() {
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+
+ if (args.length < 5) {
+ System.err.println("Usage: MetricSparkConsumer <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
+ System.exit(1);
+ }
+
+ List<String> appIds = Arrays.asList(args[0].split(","));
+ String collectorHost = args[1];
+ String collectorPort = args[2];
+ String collectorProtocol = args[3];
+ String zkQuorum = args[4];
+
+ double emaW = StringUtils.isNotEmpty(args[5]) ? Double.parseDouble(args[5]) : 0.5;
+ double emaN = StringUtils.isNotEmpty(args[8]) ? Double.parseDouble(args[6]) : 3;
+ double tukeysN = StringUtils.isNotEmpty(args[7]) ? Double.parseDouble(args[7]) : 3;
+
+ long pitTestInterval = StringUtils.isNotEmpty(args[8]) ? Long.parseLong(args[8]) : 5 * 60 * 1000;
+ long pitTrainInterval = StringUtils.isNotEmpty(args[9]) ? Long.parseLong(args[9]) : 15 * 60 * 1000;
+
+ String fileName = args[10];
+ long ksTestInterval = StringUtils.isNotEmpty(args[11]) ? Long.parseLong(args[11]) : 10 * 60 * 1000;
+ long ksTrainInterval = StringUtils.isNotEmpty(args[12]) ? Long.parseLong(args[12]) : 10 * 60 * 1000;
+ int hsdevNhp = StringUtils.isNotEmpty(args[13]) ? Integer.parseInt(args[13]) : 3;
+ long hsdevInterval = StringUtils.isNotEmpty(args[14]) ? Long.parseLong(args[14]) : 30 * 60 * 1000;
+
+ String ambariServerHost = args[15];
+ String clusterName = args[16];
+
+ MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
+
+ SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector");
+
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
+
+ EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN);
+ PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface,
+ tukeysN,
+ pitTestInterval,
+ pitTrainInterval,
+ ambariServerHost,
+ clusterName);
+
+ TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
+ ksTestInterval,
+ ksTrainInterval,
+ hsdevNhp,
+ fileName);
+
+ Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique);
+ Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem);
+ Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem);
+ Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface);
+
+ JavaPairReceiverInputDStream<String, String> messages =
+ KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
+
+ //Convert JSON string to TimelineMetrics.
+ JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() {
+ @Override
+ public TimelineMetrics call(Tuple2<String, String> message) throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class);
+ return metrics;
+ }
+ });
+
+ timelineMetricsStream.print();
+
+ //Group TimelineMetric by AppId.
+ JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair(
+ timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics)
+ );
+
+ appMetricStream.print();
+
+ //Filter AppIds that are not needed.
+ JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception {
+ return appIds.contains(appMetricTuple._1);
+ }
+ });
+
+ filteredAppMetricStream.print();
+
+ filteredAppMetricStream.foreachRDD(rdd -> {
+ rdd.foreach(
+ tuple2 -> {
+ long currentTime = System.currentTimeMillis();
+ EmaTechnique ema = emaTechniqueBroadcast.getValue();
+ if (currentTime > pitStartTime + pitTestInterval) {
+ LOG.info("Running Tukeys....");
+ pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime);
+ pitStartTime = pitStartTime + pitTestInterval;
+ }
+
+ if (currentTime > ksStartTime + ksTestInterval) {
+ LOG.info("Running KS Test....");
+ trendADSystemBroadcast.getValue().runKSTest(currentTime);
+ ksStartTime = ksStartTime + ksTestInterval;
+ }
+
+ if (currentTime > hdevStartTime + hsdevInterval) {
+ LOG.info("Running HSdev Test....");
+ trendADSystemBroadcast.getValue().runHsdevMethod();
+ hdevStartTime = hdevStartTime + hsdevInterval;
+ }
+
+ TimelineMetrics metrics = tuple2._2();
+ for (TimelineMetric timelineMetric : metrics.getMetrics()) {
+ List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+ metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+ }
+ });
+ });
+
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
+
+
+
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
new file mode 100644
index 0000000..7b3f63d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+public class MetricsCollectorInterface implements Serializable {
+
+ private static String hostName = null;
+ private String instanceId = null;
+ public final static String serviceName = "anomaly-engine";
+ private String collectorHost;
+ private String protocol;
+ private String port;
+ private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics";
+ private static final Log LOG = LogFactory.getLog(MetricsCollectorInterface.class);
+ private static ObjectMapper mapper;
+ private final static ObjectReader timelineObjectReader;
+
+ static {
+ mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+ mapper.setAnnotationIntrospector(introspector);
+ mapper.getSerializationConfig()
+ .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+ timelineObjectReader = mapper.reader(TimelineMetrics.class);
+ }
+
+ public MetricsCollectorInterface(String collectorHost, String protocol, String port) {
+ this.collectorHost = collectorHost;
+ this.protocol = protocol;
+ this.port = port;
+ this.hostName = getDefaultLocalHostName();
+ }
+
+ public static String getDefaultLocalHostName() {
+
+ if (hostName != null) {
+ return hostName;
+ }
+
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ LOG.info("Error getting host address");
+ }
+ return null;
+ }
+
+ public void publish(List<MetricAnomaly> metricAnomalies) {
+ if (CollectionUtils.isNotEmpty(metricAnomalies)) {
+ LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
+ List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
+ if (!metricList.isEmpty()) {
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(metricList);
+ emitMetrics(timelineMetrics);
+ }
+ } else {
+ LOG.info("No anomalies to send.");
+ }
+ }
+
+ private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) {
+ List<TimelineMetric> metrics = new ArrayList<>();
+
+ if (metricAnomalies.isEmpty()) {
+ return metrics;
+ }
+
+ for (MetricAnomaly anomaly : metricAnomalies) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(anomaly.getMetricKey());
+ timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType());
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setHostName(getDefaultLocalHostName());
+ timelineMetric.setStartTime(anomaly.getTimestamp());
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", anomaly.getMethodType());
+ metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore()));
+ timelineMetric.setMetadata(metadata);
+ TreeMap<Long,Double> metricValues = new TreeMap<>();
+ metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue());
+ timelineMetric.setMetricValues(metricValues);
+
+ metrics.add(timelineMetric);
+ }
+ return metrics;
+ }
+
+ public boolean emitMetrics(TimelineMetrics metrics) {
+ String connectUrl = constructTimelineMetricUri();
+ String jsonData = null;
+ LOG.info("EmitMetrics connectUrl = " + connectUrl);
+ try {
+ jsonData = mapper.writeValueAsString(metrics);
+ LOG.info(jsonData);
+ } catch (IOException e) {
+ LOG.error("Unable to parse metrics", e);
+ }
+ if (jsonData != null) {
+ return emitMetricsJson(connectUrl, jsonData);
+ }
+ return false;
+ }
+
+ private HttpURLConnection getConnection(String spec) throws IOException {
+ return (HttpURLConnection) new URL(spec).openConnection();
+ }
+
+ private boolean emitMetricsJson(String connectUrl, String jsonData) {
+ int timeout = 10000;
+ HttpURLConnection connection = null;
+ try {
+ if (connectUrl == null) {
+ throw new IOException("Unknown URL. Unable to connect to metrics collector.");
+ }
+ connection = getConnection(connectUrl);
+
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Connection", "Keep-Alive");
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ connection.setDoOutput(true);
+
+ if (jsonData != null) {
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonData.getBytes("UTF-8"));
+ }
+ }
+
+ int statusCode = connection.getResponseCode();
+
+ if (statusCode != 200) {
+ LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
+ "statusCode = " + statusCode);
+ } else {
+ LOG.info("Metrics posted to Collector " + connectUrl);
+ }
+ return true;
+ } catch (IOException ioe) {
+ LOG.error(ioe.getMessage());
+ }
+ return false;
+ }
+
+ private String constructTimelineMetricUri() {
+ StringBuilder sb = new StringBuilder(protocol);
+ sb.append("://");
+ sb.append(collectorHost);
+ sb.append(":");
+ sb.append(port);
+ sb.append(WS_V1_TIMELINE_METRICS);
+ return sb.toString();
+ }
+
+ public TimelineMetrics fetchMetrics(String metricName,
+ String appId,
+ String hostname,
+ long startime,
+ long endtime) {
+
+ String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
+ "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
+ LOG.info("Fetch metrics URL : " + url);
+
+ URL obj = null;
+ BufferedReader in = null;
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+ try {
+ obj = new URL(url);
+ HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+ con.setRequestMethod("GET");
+ int responseCode = con.getResponseCode();
+ LOG.info("Sending 'GET' request to URL : " + url);
+ LOG.info("Response Code : " + responseCode);
+
+ in = new BufferedReader(
+ new InputStreamReader(con.getInputStream()));
+ timelineMetrics = timelineObjectReader.readValue(in);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ LOG.warn(e);
+ }
+ }
+ }
+
+ LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics.");
+ return timelineMetrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
new file mode 100644
index 0000000..b4a8593
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaModel;
+import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class PointInTimeADSystem implements Serializable {
+
+ //private EmaTechnique emaTechnique;
+ private MetricsCollectorInterface metricsCollectorInterface;
+ private Map<String, Double> tukeysNMap;
+ private double defaultTukeysN = 3;
+
+ private long testIntervalMillis = 5*60*1000; //10mins
+ private long trainIntervalMillis = 15*60*1000; //1hour
+
+ private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class);
+
+ private AmbariServerInterface ambariServerInterface;
+ private int sensitivity = 50;
+ private int minSensitivity = 0;
+ private int maxSensitivity = 10;
+
+ public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
+ long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
+ this.metricsCollectorInterface = metricsCollectorInterface;
+ this.defaultTukeysN = defaultTukeysN;
+ this.tukeysNMap = new HashMap<>();
+ this.testIntervalMillis = testIntervalMillis;
+ this.trainIntervalMillis = trainIntervalMillis;
+ this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, clusterName);
+ LOG.info("Starting PointInTimeADSystem...");
+ }
+
+ public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) {
+ LOG.info("Running Tukeys for test data interval [" + new Date(startTime - testIntervalMillis) + " : " + new Date(startTime) + "], with train data period [" + new Date(startTime - testIntervalMillis - trainIntervalMillis) + " : " + new Date(startTime - testIntervalMillis) + "]");
+
+ int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity();
+ if (requiredSensivity == -1 || requiredSensivity == sensitivity) {
+ LOG.info("No change in sensitivity needed.");
+ } else {
+ LOG.info("Current tukey's N value = " + defaultTukeysN);
+ if (requiredSensivity > sensitivity) {
+ int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
+ while (sensitivity < targetSensitivity) {
+ defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.1;
+ sensitivity++;
+ }
+ } else {
+ int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
+ while (sensitivity > targetSensitivity) {
+ defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.1;
+ sensitivity--;
+ }
+ }
+ LOG.info("New tukey's N value = " + defaultTukeysN);
+ }
+
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ for (String metricKey : emaTechnique.getTrackedEmas().keySet()) {
+ LOG.info("EMA key = " + metricKey);
+ EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey);
+ String metricName = emaModel.getMetricName();
+ String appId = emaModel.getAppId();
+ String hostname = emaModel.getHostname();
+
+ TimelineMetrics tukeysData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - (testIntervalMillis + trainIntervalMillis),
+ startTime);
+
+ if (tukeysData.getMetrics().isEmpty()) {
+ LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey);
+ continue;
+ }
+
+ List<Double> trainTsList = new ArrayList<>();
+ List<Double> trainDataList = new ArrayList<>();
+ List<Double> testTsList = new ArrayList<>();
+ List<Double> testDataList = new ArrayList<>();
+
+ for (TimelineMetric metric : tukeysData.getMetrics()) {
+ for (Long timestamp : metric.getMetricValues().keySet()) {
+ if (timestamp <= (startTime - testIntervalMillis)) {
+ trainDataList.add(metric.getMetricValues().get(timestamp));
+ trainTsList.add((double)timestamp);
+ } else {
+ testDataList.add(metric.getMetricValues().get(timestamp));
+ testTsList.add((double)timestamp);
+ }
+ }
+ }
+
+ if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+ LOG.info("Not enough train/test data to perform analysis.");
+ continue;
+ }
+
+ String tukeysTrainSeries = "tukeysTrainSeries";
+ double[] trainTs = new double[trainTsList.size()];
+ double[] trainData = new double[trainTsList.size()];
+ for (int i = 0; i < trainTs.length; i++) {
+ trainTs[i] = trainTsList.get(i);
+ trainData[i] = trainDataList.get(i);
+ }
+
+ String tukeysTestSeries = "tukeysTestSeries";
+ double[] testTs = new double[testTsList.size()];
+ double[] testData = new double[testTsList.size()];
+ for (int i = 0; i < testTs.length; i++) {
+ testTs[i] = testTsList.get(i);
+ testData[i] = testDataList.get(i);
+ }
+
+ LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+ DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, trainData);
+ DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, testData);
+
+ if (!tukeysNMap.containsKey(metricKey)) {
+ tukeysNMap.put(metricKey, defaultTukeysN);
+ }
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey)));
+
+ ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, configs);
+
+ List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, appId, hostname);
+ LOG.info("Tukeys anomalies size : " + tukeysMetrics.size());
+ TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>();
+
+ for (TimelineMetric tukeysMetric : tukeysMetrics) {
+ tukeysMetricValues.putAll(tukeysMetric.getMetricValues());
+ timelineMetrics.addOrMergeTimelineMetric(tukeysMetric);
+ }
+
+ TimelineMetrics emaData = metricsCollectorInterface.fetchMetrics(metricKey, MetricsCollectorInterface.serviceName+"-ema", MetricsCollectorInterface.getDefaultLocalHostName(), startTime - testIntervalMillis, startTime);
+ TreeMap<Long, Double> emaMetricValues = new TreeMap();
+ if (!emaData.getMetrics().isEmpty()) {
+ emaMetricValues = emaData.getMetrics().get(0).getMetricValues();
+ }
+
+ LOG.info("Ema anomalies size : " + emaMetricValues.size());
+ int tp = 0;
+ int tn = 0;
+ int fp = 0;
+ int fn = 0;
+
+ for (double ts : testTs) {
+ long timestamp = (long) ts;
+ if (tukeysMetricValues.containsKey(timestamp)) {
+ if (emaMetricValues.containsKey(timestamp)) {
+ tp++;
+ } else {
+ fn++;
+ }
+ } else {
+ if (emaMetricValues.containsKey(timestamp)) {
+ fp++;
+ } else {
+ tn++;
+ }
+ }
+ }
+
+ double recall = (double) tp / (double) (tp + fn);
+ double precision = (double) tp / (double) (tp + fp);
+ LOG.info("----------------------------");
+ LOG.info("Precision Recall values for " + metricKey);
+ LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn);
+ LOG.info("----------------------------");
+
+ if (recall < 0.5) {
+ LOG.info("Increasing EMA sensitivity by 10%");
+ emaModel.updateModel(true, 10);
+ } else if (precision < 0.5) {
+ LOG.info("Decreasing EMA sensitivity by 10%");
+ emaModel.updateModel(false, 10);
+ }
+
+ }
+
+ if (emaTechnique.getTrackedEmas().isEmpty()){
+ LOG.info("No EMA Technique keys tracked!!!!");
+ }
+
+ if (!timelineMetrics.getMetrics().isEmpty()) {
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+ }
+ }
+
+ private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, String metricName, String appId, String hostname) {
+
+ List<TimelineMetric> timelineMetrics = new ArrayList<>();
+
+ if (result == null) {
+ LOG.info("ResultSet from R call is null!!");
+ return null;
+ }
+
+ if (result.resultset.size() > 0) {
+ double[] ts = result.resultset.get(0);
+ double[] metrics = result.resultset.get(1);
+ double[] anomalyScore = result.resultset.get(2);
+ for (int i = 0; i < ts.length; i++) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricName + "_" + appId + "_" + hostname);
+ timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+ timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setStartTime((long) ts[i]);
+ TreeMap<Long, Double> metricValues = new TreeMap<>();
+ metricValues.put((long) ts[i], metrics[i]);
+
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", "tukeys");
+ metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+ timelineMetric.setMetadata(metadata);
+
+ timelineMetric.setMetricValues(metricValues);
+ timelineMetrics.add(timelineMetric);
+ }
+ }
+
+ return timelineMetrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java
new file mode 100644
index 0000000..4fdf27d
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/RFunctionInvoker.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.rosuda.JRI.REXP;
+import org.rosuda.JRI.RVector;
+import org.rosuda.JRI.Rengine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class RFunctionInvoker {
+
+ static final Log LOG = LogFactory.getLog(RFunctionInvoker.class);
+ public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null);
+ private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts";
+
+ private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) {
+ r.assign("train_ts", trainData.ts);
+ r.assign("train_x", trainData.values);
+ r.eval("train_data <- data.frame(train_ts,train_x)");
+ r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")");
+
+ r.assign("test_ts", testData.ts);
+ r.assign("test_x", testData.values);
+ r.eval("test_data <- data.frame(test_ts,test_x)");
+ r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")");
+ }
+
+ public static void setScriptsDir(String dir) {
+ rScriptDir = dir;
+ }
+
+ public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+
+ ResultSet result;
+ switch (methodType) {
+ case "tukeys":
+ result = tukeys(trainData, testData, configs);
+ break;
+ case "ema":
+ result = ema_global(trainData, testData, configs);
+ break;
+ case "ks":
+ result = ksTest(trainData, testData, configs);
+ break;
+ case "hsdev":
+ result = hsdev(trainData, testData, configs);
+ break;
+ default:
+ result = tukeys(trainData, testData, configs);
+ break;
+ }
+ return result;
+ }
+
+ public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+
+ REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')");
+
+ double n = Double.parseDouble(configs.get("tukeys.n"));
+ r.eval("n <- " + n);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ams_tukeys(train_data, test_data, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+ int n = Integer.parseInt(configs.get("ema.n"));
+ r.eval("n <- " + n);
+
+ double w = Double.parseDouble(configs.get("ema.w"));
+ r.eval("w <- " + w);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ema_global(train_data, test_data, w, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/ema.r" + "')");
+
+ int n = Integer.parseInt(configs.get("ema.n"));
+ r.eval("n <- " + n);
+
+ double w = Double.parseDouble(configs.get("ema.w"));
+ r.eval("w <- " + w);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ema_daily(train_data, test_data, w, n)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/kstest.r" + "')");
+
+ double p_value = Double.parseDouble(configs.get("ks.p_value"));
+ r.eval("p_value <- " + p_value);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an <- ams_ks(train_data, test_data, p_value)");
+ REXP exp = r.eval("an");
+ RVector cont = (RVector) exp.getContent();
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+
+ public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) {
+ try {
+ r.eval("source('" + rScriptDir + "/hsdev.r" + "')");
+
+ int n = Integer.parseInt(configs.get("hsdev.n"));
+ r.eval("n <- " + n);
+
+ int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
+ r.eval("nhp <- " + nhp);
+
+ long interval = Long.parseLong(configs.get("hsdev.interval"));
+ r.eval("interval <- " + interval);
+
+ long period = Long.parseLong(configs.get("hsdev.period"));
+ r.eval("period <- " + period);
+
+ loadDataSets(r, trainData, testData);
+
+ r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)");
+ REXP exp = r.eval("an2");
+ RVector cont = (RVector) exp.getContent();
+
+ List<double[]> result = new ArrayList();
+ for (int i = 0; i < cont.size(); i++) {
+ result.add(cont.at(i).asDoubleArray());
+ }
+ return new ResultSet(result);
+ } catch (Exception e) {
+ LOG.error(e);
+ } finally {
+ r.end();
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java
new file mode 100644
index 0000000..7485f01
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TestSeriesInputRequest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Collections;
+import java.util.Map;
+
+@XmlRootElement
+public class TestSeriesInputRequest {
+
+ private String seriesName;
+ private String seriesType;
+ private Map<String, String> configs;
+
+ public TestSeriesInputRequest() {
+ }
+
+ public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) {
+ this.seriesName = seriesName;
+ this.seriesType = seriesType;
+ this.configs = configs;
+ }
+
+ public String getSeriesName() {
+ return seriesName;
+ }
+
+ public void setSeriesName(String seriesName) {
+ this.seriesName = seriesName;
+ }
+
+ public String getSeriesType() {
+ return seriesType;
+ }
+
+ public void setSeriesType(String seriesType) {
+ this.seriesType = seriesType;
+ }
+
+ public Map<String, String> getConfigs() {
+ return configs;
+ }
+
+ public void setConfigs(Map<String, String> configs) {
+ this.configs = configs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o;
+ return anotherInput.getSeriesName().equals(this.getSeriesName());
+ }
+
+ @Override
+ public int hashCode() {
+ return seriesName.hashCode();
+ }
+
+ public static void main(String[] args) {
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value"));
+ try {
+ System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest));
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
new file mode 100644
index 0000000..1534b55
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.hsdev.HsdevTechnique;
+import org.apache.ambari.metrics.alertservice.prototype.methods.kstest.KSTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TrendADSystem implements Serializable {
+
+ private MetricsCollectorInterface metricsCollectorInterface;
+ private List<TrendMetric> trendMetrics;
+
+ private long ksTestIntervalMillis = 10 * 60 * 1000;
+ private long ksTrainIntervalMillis = 10 * 60 * 1000;
+ private KSTechnique ksTechnique;
+
+ private HsdevTechnique hsdevTechnique;
+ private int hsdevNumHistoricalPeriods = 3;
+
+ private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>();
+ private static final Log LOG = LogFactory.getLog(TrendADSystem.class);
+ private String inputFile = "";
+
+ public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
+ long ksTestIntervalMillis,
+ long ksTrainIntervalMillis,
+ int hsdevNumHistoricalPeriods,
+ String inputFileName) {
+
+ this.metricsCollectorInterface = metricsCollectorInterface;
+ this.ksTestIntervalMillis = ksTestIntervalMillis;
+ this.ksTrainIntervalMillis = ksTrainIntervalMillis;
+ this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods;
+
+ this.ksTechnique = new KSTechnique();
+ this.hsdevTechnique = new HsdevTechnique();
+
+ trendMetrics = new ArrayList<>();
+ this.inputFile = inputFileName;
+ readInputFile(inputFileName);
+ }
+
+ public void runKSTest(long currentEndTime) {
+ readInputFile(inputFile);
+
+ long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
+ LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " +
+ new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis)
+ + " : " + new Date(ksTestIntervalStartTime) + "]");
+
+ for (TrendMetric metric : trendMetrics) {
+ String metricName = metric.metricName;
+ String appId = metric.appId;
+ String hostname = metric.hostname;
+ String key = metricName + "_" + appId + "_" + hostname;
+
+ TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis,
+ currentEndTime);
+
+ if (ksData.getMetrics().isEmpty()) {
+ LOG.info("No metrics fetched for KS, metricKey = " + key);
+ continue;
+ }
+
+ List<Double> trainTsList = new ArrayList<>();
+ List<Double> trainDataList = new ArrayList<>();
+ List<Double> testTsList = new ArrayList<>();
+ List<Double> testDataList = new ArrayList<>();
+
+ for (TimelineMetric timelineMetric : ksData.getMetrics()) {
+ for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+ if (timestamp <= ksTestIntervalStartTime) {
+ trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ trainTsList.add((double) timestamp);
+ } else {
+ testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ testTsList.add((double) timestamp);
+ }
+ }
+ }
+
+ if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+ LOG.info("Not enough train/test data to perform KS analysis.");
+ continue;
+ }
+
+ String ksTrainSeries = "KSTrainSeries";
+ double[] trainTs = new double[trainTsList.size()];
+ double[] trainData = new double[trainTsList.size()];
+ for (int i = 0; i < trainTs.length; i++) {
+ trainTs[i] = trainTsList.get(i);
+ trainData[i] = trainDataList.get(i);
+ }
+
+ String ksTestSeries = "KSTestSeries";
+ double[] testTs = new double[testTsList.size()];
+ double[] testData = new double[testTsList.size()];
+ for (int i = 0; i < testTs.length; i++) {
+ testTs[i] = testTsList.get(i);
+ testData[i] = testDataList.get(i);
+ }
+
+ LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+ DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData);
+ DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData);
+
+ MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData);
+ if (metricAnomaly == null) {
+ LOG.info("No anomaly from KS test.");
+ } else {
+ LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric....");
+ TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly,
+ ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+
+ trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly);
+ }
+ }
+
+ if (trendMetrics.isEmpty()) {
+ LOG.info("No Trend metrics tracked!!!!");
+ }
+
+ }
+
+ private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly,
+ long testStart,
+ long testEnd,
+ long trainStart,
+ long trainEnd) {
+
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricAnomaly.getMetricKey());
+ timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType());
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+ timelineMetric.setStartTime(testEnd);
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", metricAnomaly.getMethodType());
+ metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore()));
+ metadata.put("test-start-time", String.valueOf(testStart));
+ metadata.put("train-start-time", String.valueOf(trainStart));
+ metadata.put("train-end-time", String.valueOf(trainEnd));
+ timelineMetric.setMetadata(metadata);
+ TreeMap<Long,Double> metricValues = new TreeMap<>();
+ metricValues.put(testEnd, metricAnomaly.getMetricValue());
+ timelineMetric.setMetricValues(metricValues);
+ return timelineMetric;
+
+ }
+ public void runHsdevMethod() {
+
+ List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
+
+ for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) {
+
+ long hsdevTestEnd = ksSingleRunKey.endTime;
+ long hsdevTestStart = ksSingleRunKey.startTime;
+
+ long period = hsdevTestEnd - hsdevTestStart;
+
+ long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period;
+ long hsdevTrainEnd = hsdevTestStart;
+
+ LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " +
+ new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart)
+ + " : " + new Date(hsdevTrainEnd) + "]");
+
+ String metricName = ksSingleRunKey.metricName;
+ String appId = ksSingleRunKey.appId;
+ String hostname = ksSingleRunKey.hostname;
+ String key = metricName + "_" + appId + "_" + hostname;
+
+ TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics(
+ metricName,
+ appId,
+ hostname,
+ hsdevTrainStart,
+ hsdevTestEnd);
+
+ if (hsdevData.getMetrics().isEmpty()) {
+ LOG.info("No metrics fetched for HSDev, metricKey = " + key);
+ continue;
+ }
+
+ List<Double> trainTsList = new ArrayList<>();
+ List<Double> trainDataList = new ArrayList<>();
+ List<Double> testTsList = new ArrayList<>();
+ List<Double> testDataList = new ArrayList<>();
+
+ for (TimelineMetric timelineMetric : hsdevData.getMetrics()) {
+ for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
+ if (timestamp <= hsdevTestStart) {
+ trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ trainTsList.add((double) timestamp);
+ } else {
+ testDataList.add(timelineMetric.getMetricValues().get(timestamp));
+ testTsList.add((double) timestamp);
+ }
+ }
+ }
+
+ if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
+ LOG.info("Not enough train/test data to perform Hsdev analysis.");
+ continue;
+ }
+
+ String hsdevTrainSeries = "HsdevTrainSeries";
+ double[] trainTs = new double[trainTsList.size()];
+ double[] trainData = new double[trainTsList.size()];
+ for (int i = 0; i < trainTs.length; i++) {
+ trainTs[i] = trainTsList.get(i);
+ trainData[i] = trainDataList.get(i);
+ }
+
+ String hsdevTestSeries = "HsdevTestSeries";
+ double[] testTs = new double[testTsList.size()];
+ double[] testData = new double[testTsList.size()];
+ for (int i = 0; i < testTs.length; i++) {
+ testTs[i] = testTsList.get(i);
+ testData[i] = testDataList.get(i);
+ }
+
+ LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length);
+
+ DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData);
+ DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData);
+
+ MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData);
+ if (metricAnomaly == null) {
+ LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. ");
+ ksTechnique.updateModel(key, false, 10);
+ } else {
+ LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly.");
+ hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly,
+ hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd));
+ }
+ }
+ clearTrackedKsRunKeys();
+
+ if (!hsdevMetricAnomalies.isEmpty()) {
+ LOG.info("Publishing Hsdev Anomalies....");
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(hsdevMetricAnomalies);
+ metricsCollectorInterface.emitMetrics(timelineMetrics);
+ }
+ }
+
+ private void clearTrackedKsRunKeys() {
+ trackedKsAnomalies.clear();
+ }
+
+ private void readInputFile(String fileName) {
+ trendMetrics.clear();
+ try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
+ for (String line; (line = br.readLine()) != null; ) {
+ String[] splits = line.split(",");
+ LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
+ trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
+ }
+ } catch (IOException e) {
+ LOG.error("Error reading input file : " + e);
+ }
+ }
+
+ class KsSingleRunKey implements Serializable{
+
+ long startTime;
+ long endTime;
+ String metricName;
+ String appId;
+ String hostname;
+
+ public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) {
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.metricName = metricName;
+ this.appId = appId;
+ this.hostname = hostname;
+ }
+ }
+
+ /*
+ boolean isPresent = false;
+ for (TrendMetric trendMetric : trendMetrics) {
+ if (trendMetric.metricName.equalsIgnoreCase(splits[0])) {
+ isPresent = true;
+ }
+ }
+ if (!isPresent) {
+ LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
+ trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
+ }
+ */
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java
new file mode 100644
index 0000000..3bead8b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendMetric.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import java.io.Serializable;
+
+public class TrendMetric implements Serializable {
+
+ String metricName;
+ String appId;
+ String hostname;
+
+ public TrendMetric(String metricName, String appId, String hostname) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.hostname = hostname;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
new file mode 100644
index 0000000..eb19857
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.common;
+
+import java.util.Arrays;
+
+public class DataSeries {
+
+ public String seriesName;
+ public double[] ts;
+ public double[] values;
+
+ public DataSeries(String seriesName, double[] ts, double[] values) {
+ this.seriesName = seriesName;
+ this.ts = ts;
+ this.values = values;
+ }
+
+ @Override
+ public String toString() {
+ return seriesName + Arrays.toString(ts) + Arrays.toString(values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
new file mode 100644
index 0000000..101b0e9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.common;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ResultSet {
+
+ public List<double[]> resultset = new ArrayList<>();
+
+ public ResultSet(List<double[]> resultset) {
+ this.resultset = resultset;
+ }
+
+ public void print() {
+ System.out.println("Result : ");
+ if (!resultset.isEmpty()) {
+ for (int i = 0; i<resultset.get(0).length;i++) {
+ for (double[] entity : resultset) {
+ System.out.print(entity[i] + " ");
+ }
+ System.out.println();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
new file mode 100644
index 0000000..4ea4ac5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.common;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public class StatisticUtils {
+
+ public static double mean(double[] values) {
+ double sum = 0;
+ for (double d : values) {
+ sum += d;
+ }
+ return sum / values.length;
+ }
+
+ public static double variance(double[] values) {
+ double avg = mean(values);
+ double variance = 0;
+ for (double d : values) {
+ variance += Math.pow(d - avg, 2.0);
+ }
+ return variance;
+ }
+
+ public static double sdev(double[] values, boolean useBesselsCorrection) {
+ double variance = variance(values);
+ int n = (useBesselsCorrection) ? values.length - 1 : values.length;
+ return Math.sqrt(variance / n);
+ }
+
+ public static double median(double[] values) {
+ double[] clonedValues = Arrays.copyOf(values, values.length);
+ Arrays.sort(clonedValues);
+ int n = values.length;
+
+ if (n % 2 != 0) {
+ return clonedValues[(n-1)/2];
+ } else {
+ return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
new file mode 100644
index 0000000..0b10b4b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.sql.Time;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AnomalyDetectionTechnique {
+
+ protected String methodType;
+
+ public abstract List<MetricAnomaly> test(TimelineMetric metric);
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
new file mode 100644
index 0000000..da4f030
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricAnomaly implements Serializable{
+
+ private String methodType;
+ private double anomalyScore;
+ private String metricKey;
+ private long timestamp;
+ private double metricValue;
+
+
+ public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) {
+ this.metricKey = metricKey;
+ this.timestamp = timestamp;
+ this.metricValue = metricValue;
+ this.methodType = methodType;
+ this.anomalyScore = anomalyScore;
+
+ }
+
+ public String getMethodType() {
+ return methodType;
+ }
+
+ public void setMethodType(String methodType) {
+ this.methodType = methodType;
+ }
+
+ public double getAnomalyScore() {
+ return anomalyScore;
+ }
+
+ public void setAnomalyScore(double anomalyScore) {
+ this.anomalyScore = anomalyScore;
+ }
+
+ public void setMetricKey(String metricKey) {
+ this.metricKey = metricKey;
+ }
+
+ public String getMetricKey() {
+ return metricKey;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricKey = metricName;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public double getMetricValue() {
+ return metricValue;
+ }
+
+ public void setMetricValue(double metricValue) {
+ this.metricValue = metricValue;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
new file mode 100644
index 0000000..5e1f76b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+
+@XmlRootElement
+public class EmaModel implements Serializable {
+
+ private String metricName;
+ private String hostname;
+ private String appId;
+ private double ema;
+ private double ems;
+ private double weight;
+ private double timessdev;
+
+ private int ctr = 0;
+ private static final int suppressAnomaliesTheshold = 30;
+
+ private static final Log LOG = LogFactory.getLog(EmaModel.class);
+
+ public EmaModel(String name, String hostname, String appId, double weight, double timessdev) {
+ this.metricName = name;
+ this.hostname = hostname;
+ this.appId = appId;
+ this.weight = weight;
+ this.timessdev = timessdev;
+ this.ema = 0.0;
+ this.ems = 0.0;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public double testAndUpdate(double metricValue) {
+
+ double anomalyScore = 0.0;
+ if (ctr > suppressAnomaliesTheshold) {
+ anomalyScore = test(metricValue);
+ }
+ if (Math.abs(anomalyScore) < 2 * timessdev) {
+ update(metricValue);
+ } else {
+ LOG.info("Not updating model for this value");
+ }
+ ctr++;
+ LOG.info("Counter : " + ctr);
+ LOG.info("Anomaly Score for " + metricValue + " : " + anomalyScore);
+ return anomalyScore;
+ }
+
+ public void update(double metricValue) {
+ ema = weight * ema + (1 - weight) * metricValue;
+ ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
+ LOG.info("In update : ema = " + ema + ", ems = " + ems);
+ }
+
+ public double test(double metricValue) {
+ LOG.info("In test : ema = " + ema + ", ems = " + ems);
+ double diff = Math.abs(ema - metricValue) - (timessdev * ems);
+ LOG.info("diff = " + diff);
+ if (diff > 0) {
+ return Math.abs((metricValue - ema) / ems); //Z score
+ } else {
+ return 0.0;
+ }
+ }
+
+ public void updateModel(boolean increaseSensitivity, double percent) {
+ LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent);
+ double delta = percent / 100;
+ if (increaseSensitivity) {
+ delta = delta * -1;
+ }
+ this.timessdev = timessdev + delta * timessdev;
+ this.weight = Math.min(1.0, weight + delta * weight);
+ LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight);
+ }
+
+ public double getWeight() {
+ return weight;
+ }
+
+ public void setWeight(double weight) {
+ this.weight = weight;
+ }
+
+ public double getTimessdev() {
+ return timessdev;
+ }
+
+ public void setTimessdev(double timessdev) {
+ this.timessdev = timessdev;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
new file mode 100644
index 0000000..62749c1
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Loader;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class EmaModelLoader implements Loader<EmaTechnique> {
+ private static final Log LOG = LogFactory.getLog(EmaModelLoader.class);
+
+ @Override
+ public EmaTechnique load(SparkContext sc, String path) {
+ return new EmaTechnique(0.5,3);
+// Gson gson = new Gson();
+// try {
+// String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+// return gson.fromJson(fileString, EmaTechnique.class);
+// } catch (IOException e) {
+// LOG.error(e);
+// }
+// return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
new file mode 100644
index 0000000..c005e6f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
+
+import com.google.gson.Gson;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.ambari.metrics.alertservice.prototype.methods.AnomalyDetectionTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.spark.SparkContext;
+import org.apache.spark.mllib.util.Saveable;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@XmlRootElement
+public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable {
+
+ @XmlElement(name = "trackedEmas")
+ private Map<String, EmaModel> trackedEmas;
+ private static final Log LOG = LogFactory.getLog(EmaTechnique.class);
+
+ private double startingWeight = 0.5;
+ private double startTimesSdev = 3.0;
+ private String methodType = "ema";
+
+ public EmaTechnique(double startingWeight, double startTimesSdev) {
+ trackedEmas = new HashMap<>();
+ this.startingWeight = startingWeight;
+ this.startTimesSdev = startTimesSdev;
+ LOG.info("New EmaTechnique......");
+ }
+
+ public List<MetricAnomaly> test(TimelineMetric metric) {
+ String metricName = metric.getMetricName();
+ String appId = metric.getAppId();
+ String hostname = metric.getHostName();
+ String key = metricName + "_" + appId + "_" + hostname;
+
+ EmaModel emaModel = trackedEmas.get(key);
+ if (emaModel == null) {
+ LOG.info("EmaModel not present for " + key);
+ LOG.info("Number of tracked Emas : " + trackedEmas.size());
+ emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev);
+ trackedEmas.put(key, emaModel);
+ } else {
+ LOG.info("EmaModel already present for " + key);
+ }
+
+ List<MetricAnomaly> anomalies = new ArrayList<>();
+
+ for (Long timestamp : metric.getMetricValues().keySet()) {
+ double metricValue = metric.getMetricValues().get(timestamp);
+ double anomalyScore = emaModel.testAndUpdate(metricValue);
+ if (anomalyScore > 0.0) {
+ LOG.info("Found anomaly for : " + key);
+ MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore);
+ anomalies.add(metricAnomaly);
+ } else {
+ LOG.info("Discarding non-anomaly for : " + key);
+ }
+ }
+ return anomalies;
+ }
+
+ public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) {
+ String metricName = timelineMetric.getMetricName();
+ String appId = timelineMetric.getAppId();
+ String hostname = timelineMetric.getHostName();
+ String key = metricName + "_" + appId + "_" + hostname;
+
+
+ EmaModel emaModel = trackedEmas.get(key);
+
+ if (emaModel == null) {
+ LOG.warn("EMA Model for " + key + " not found");
+ return false;
+ }
+ emaModel.updateModel(increaseSensitivity, percent);
+
+ return true;
+ }
+
+ @Override
+ public void save(SparkContext sc, String path) {
+ Gson gson = new Gson();
+ try {
+ String json = gson.toJson(this);
+ try (Writer writer = new BufferedWriter(new OutputStreamWriter(
+ new FileOutputStream(path), "utf-8"))) {
+ writer.write(json);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+
+ @Override
+ public String formatVersion() {
+ return "1.0";
+ }
+
+ public Map<String, EmaModel> getTrackedEmas() {
+ return trackedEmas;
+ }
+
+ public double getStartingWeight() {
+ return startingWeight;
+ }
+
+ public double getStartTimesSdev() {
+ return startTimesSdev;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
new file mode 100644
index 0000000..50bf9f2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.methods.hsdev;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.median;
+import static org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.sdev;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HsdevTechnique implements Serializable {
+
+ private Map<String, Double> hsdevMap;
+ private String methodType = "hsdev";
+ private static final Log LOG = LogFactory.getLog(HsdevTechnique.class);
+
+ public HsdevTechnique() {
+ hsdevMap = new HashMap<>();
+ }
+
+ public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) {
+ int testLength = testData.values.length;
+ int trainLength = trainData.values.length;
+
+ if (trainLength < testLength) {
+ LOG.info("Not enough train data.");
+ return null;
+ }
+
+ if (!hsdevMap.containsKey(key)) {
+ hsdevMap.put(key, 3.0);
+ }
+
+ double n = hsdevMap.get(key);
+
+ double historicSd = sdev(trainData.values, false);
+ double historicMedian = median(trainData.values);
+ double currentMedian = median(testData.values);
+
+ double diff = Math.abs(currentMedian - historicMedian);
+ LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
+ LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
+
+ if (diff > n * historicSd) {
+ double zScore = diff / historicSd;
+ LOG.info("Z Score of current series : " + zScore);
+ return new MetricAnomaly(key,
+ (long) testData.ts[testLength - 1],
+ testData.values[testLength - 1],
+ methodType,
+ zScore);
+ }
+ return null;
+ }
+
+}