You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/09/26 21:39:12 UTC
[1/2] ambari git commit: AMBARI-21106 : ML-Prototype: Detect
timeseries anomaly for a metric. (Refine PIT & Trend subsystems,
Integrate with AMS, Ambari Alerts.)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-ams 63e743557 -> a11d1033d
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_point_in_time_metric_anomalies.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_point_in_time_metric_anomalies.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_point_in_time_metric_anomalies.py
new file mode 100644
index 0000000..154ce1c
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_point_in_time_metric_anomalies.py
@@ -0,0 +1,185 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import urllib
+import time
+import os
+import ambari_commons.network as network
+import logging
+
+from ambari_agent.AmbariConfig import AmbariConfig
+
+RESULT_STATE_OK = 'OK'
+RESULT_STATE_CRITICAL = 'CRITICAL'
+RESULT_STATE_WARNING = 'WARNING'
+RESULT_STATE_UNKNOWN = 'UNKNOWN'
+RESULT_STATE_SKIPPED = 'SKIPPED'
+
+AMS_HTTP_POLICY = '{{ams-site/timeline.metrics.service.http.policy}}'
+METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY = '{{ams-site/timeline.metrics.service.webapp.address}}'
+METRICS_COLLECTOR_VIP_HOST_KEY = '{{cluster-env/metrics_collector_vip_host}}'
+METRICS_COLLECTOR_VIP_PORT_KEY = '{{cluster-env/metrics_collector_vip_port}}'
+
+INTERVAL_PARAM_KEY = 'interval'
+INTERVAL_PARAM_DEFAULT = 10
+
+NUM_ANOMALIES_KEY = 'num_anomalies'
+NUM_ANOMALIES_DEFAULT = 5
+
+SENSITIVITY_KEY = 'sensitivity'
+SENSITIVITY_DEFAULT = 5
+
+AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics/anomalies?%s"
+
+logger = logging.getLogger()
+
+def get_tokens():
+ """
+ Returns a tuple of tokens in the format {{site/property}} that will be used
+ to build the dictionary passed into execute
+ """
+ return (METRICS_COLLECTOR_VIP_HOST_KEY, METRICS_COLLECTOR_VIP_PORT_KEY,
+ METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, AMS_HTTP_POLICY)
+
+
+def execute(configurations={}, parameters={}, host_name=None):
+ """
+ Returns a tuple containing the result code and a pre-formatted result label
+
+ Keyword arguments:
+ configurations (dictionary): a mapping of configuration key to value
+ parameters (dictionary): a mapping of script parameter key to value
+ host_name (string): the name of this host where the alert is running
+ """
+
+ """
+ Get ready with AMS GET url.
+ Query AMS for point in time anomalies in the last 30mins.
+ Generate a message with anomalies.
+ """
+ if configurations is None:
+ return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.'])
+
+ collector_host = host_name
+ current_time = int(time.time()) * 1000
+
+ interval = INTERVAL_PARAM_DEFAULT
+ if INTERVAL_PARAM_KEY in parameters:
+ interval = _coerce_to_integer(parameters[INTERVAL_PARAM_KEY])
+
+ num_anomalies = NUM_ANOMALIES_DEFAULT
+ if NUM_ANOMALIES_KEY in parameters:
+ num_anomalies = _coerce_to_integer(parameters[NUM_ANOMALIES_KEY])
+
+ sensitivity = SENSITIVITY_DEFAULT
+ if SENSITIVITY_KEY in parameters:
+ sensitivity = _coerce_to_integer(parameters[SENSITIVITY_KEY])
+
+ if METRICS_COLLECTOR_VIP_HOST_KEY in configurations and METRICS_COLLECTOR_VIP_PORT_KEY in configurations:
+ collector_host = configurations[METRICS_COLLECTOR_VIP_HOST_KEY]
+ collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY])
+ else:
+ # ams-site/timeline.metrics.service.webapp.address is required
+ if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations:
+ return (RESULT_STATE_UNKNOWN,
+ ['{0} is a required parameter for the script'.format(METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY)])
+ else:
+ collector_webapp_address = configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY].split(":")
+ if valid_collector_webapp_address(collector_webapp_address):
+ collector_port = int(collector_webapp_address[1])
+ else:
+ return (RESULT_STATE_UNKNOWN, ['{0} value should be set as "fqdn_hostname:port", but set to {1}'.format(
+ METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY])])
+
+ get_ema_anomalies_parameters = {
+ "method": "ema",
+ "startTime": current_time - interval * 60 * 1000,
+ "endTime": current_time,
+ "limit": num_anomalies + 1
+ }
+
+ encoded_get_metrics_parameters = urllib.urlencode(get_ema_anomalies_parameters)
+
+ ams_collector_conf_dir = "/etc/ambari-metrics-collector/conf"
+ metric_truststore_ca_certs = 'ca.pem'
+ ca_certs = os.path.join(ams_collector_conf_dir,
+ metric_truststore_ca_certs)
+ metric_collector_https_enabled = str(configurations[AMS_HTTP_POLICY]) == "HTTPS_ONLY"
+
+ try:
+ conn = network.get_http_connection(
+ collector_host,
+ int(collector_port),
+ metric_collector_https_enabled,
+ ca_certs,
+ ssl_version=AmbariConfig.get_resolved_config().get_force_https_protocol_value()
+ )
+ conn.request("GET", AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
+ response = conn.getresponse()
+ data = response.read()
+ logger.info("Data read from metric anomaly endpoint")
+ logger.info(data)
+ conn.close()
+ except Exception:
+ return (RESULT_STATE_UNKNOWN, ["Unable to retrieve anomaly metrics from the Ambari Metrics service."])
+
+ if response.status != 200:
+ return (RESULT_STATE_UNKNOWN, ["Unable to retrieve anomaly metrics from the Ambari Metrics service."])
+
+ data_json = json.loads(data)
+ length = len(data_json["metrics"])
+ logger.info("Number of anomalies returned : {0}".format(length))
+
+ if length == 0:
+ alert_state = RESULT_STATE_OK
+ alert_label = 'No point in time anomalies in the last {0} minutes.'.format(interval)
+ logger.info(alert_label)
+ elif length <= 5:
+ alert_state = RESULT_STATE_WARNING
+ alert_label = "http://avijayan-ad-1.openstacklocal:3000/dashboard/script/scripted.js?anomalies=" + data
+ else:
+ alert_state = RESULT_STATE_CRITICAL
+ alert_label = "http://avijayan-ad-1.openstacklocal:3000/dashboard/script/scripted.js?anomalies=" + data
+
+ return (alert_state, [alert_label])
+
+
+def valid_collector_webapp_address(webapp_address):
+ if len(webapp_address) == 2 \
+ and webapp_address[0] != '127.0.0.1' \
+ and webapp_address[1].isdigit():
+ return True
+
+ return False
+
+
+def _coerce_to_integer(value):
+ """
+ Attempts to correctly coerce a value to an integer. For the case of an integer or a float,
+ this will essentially either NOOP or return a truncated value. If the parameter is a string,
+ then it will first attempt to be coerced from a integer, and failing that, a float.
+ :param value: the value to coerce
+ :return: the coerced value as an integer
+ """
+ try:
+ return int(value)
+ except ValueError:
+ return int(float(value))
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_trend_metric_anomalies.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_trend_metric_anomalies.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_trend_metric_anomalies.py
new file mode 100644
index 0000000..8813d8e
--- /dev/null
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/alerts/alert_trend_metric_anomalies.py
@@ -0,0 +1,185 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import json
+import urllib
+import time
+import os
+import ambari_commons.network as network
+import logging
+
+from ambari_agent.AmbariConfig import AmbariConfig
+
+RESULT_STATE_OK = 'OK'
+RESULT_STATE_CRITICAL = 'CRITICAL'
+RESULT_STATE_WARNING = 'WARNING'
+RESULT_STATE_UNKNOWN = 'UNKNOWN'
+RESULT_STATE_SKIPPED = 'SKIPPED'
+
+AMS_HTTP_POLICY = '{{ams-site/timeline.metrics.service.http.policy}}'
+METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY = '{{ams-site/timeline.metrics.service.webapp.address}}'
+METRICS_COLLECTOR_VIP_HOST_KEY = '{{cluster-env/metrics_collector_vip_host}}'
+METRICS_COLLECTOR_VIP_PORT_KEY = '{{cluster-env/metrics_collector_vip_port}}'
+
+INTERVAL_PARAM_KEY = 'interval'
+INTERVAL_PARAM_DEFAULT = 10
+
+NUM_ANOMALIES_KEY = 'num_anomalies'
+NUM_ANOMALIES_DEFAULT = 5
+
+SENSITIVITY_KEY = 'sensitivity'
+SENSITIVITY_DEFAULT = 5
+
+AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics/anomalies?%s"
+
+logger = logging.getLogger()
+
+def get_tokens():
+ """
+ Returns a tuple of tokens in the format {{site/property}} that will be used
+ to build the dictionary passed into execute
+ """
+ return (METRICS_COLLECTOR_VIP_HOST_KEY, METRICS_COLLECTOR_VIP_PORT_KEY,
+ METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, AMS_HTTP_POLICY)
+
+
+def execute(configurations={}, parameters={}, host_name=None):
+ """
+ Returns a tuple containing the result code and a pre-formatted result label
+
+ Keyword arguments:
+ configurations (dictionary): a mapping of configuration key to value
+ parameters (dictionary): a mapping of script parameter key to value
+ host_name (string): the name of this host where the alert is running
+ """
+
+ """
+ Get ready with AMS GET url.
+ Query AMS for point in time anomalies in the last 30mins.
+ Generate a message with anomalies.
+ """
+ if configurations is None:
+ return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.'])
+
+ collector_host = host_name
+ current_time = int(time.time()) * 1000
+
+ interval = INTERVAL_PARAM_DEFAULT
+ if INTERVAL_PARAM_KEY in parameters:
+ interval = _coerce_to_integer(parameters[INTERVAL_PARAM_KEY])
+
+ num_anomalies = NUM_ANOMALIES_DEFAULT
+ if NUM_ANOMALIES_KEY in parameters:
+ num_anomalies = _coerce_to_integer(parameters[NUM_ANOMALIES_KEY])
+
+ sensitivity = SENSITIVITY_DEFAULT
+ if SENSITIVITY_KEY in parameters:
+ sensitivity = _coerce_to_integer(parameters[SENSITIVITY_KEY])
+
+ if METRICS_COLLECTOR_VIP_HOST_KEY in configurations and METRICS_COLLECTOR_VIP_PORT_KEY in configurations:
+ collector_host = configurations[METRICS_COLLECTOR_VIP_HOST_KEY]
+ collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY])
+ else:
+ # ams-site/timeline.metrics.service.webapp.address is required
+ if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations:
+ return (RESULT_STATE_UNKNOWN,
+ ['{0} is a required parameter for the script'.format(METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY)])
+ else:
+ collector_webapp_address = configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY].split(":")
+ if valid_collector_webapp_address(collector_webapp_address):
+ collector_port = int(collector_webapp_address[1])
+ else:
+ return (RESULT_STATE_UNKNOWN, ['{0} value should be set as "fqdn_hostname:port", but set to {1}'.format(
+ METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY])])
+
+ get_ema_anomalies_parameters = {
+ "method": "ks",
+ "startTime": current_time - interval * 60 * 1000,
+ "endTime": current_time,
+ "limit": num_anomalies + 1
+ }
+
+ encoded_get_metrics_parameters = urllib.urlencode(get_ema_anomalies_parameters)
+
+ ams_collector_conf_dir = "/etc/ambari-metrics-collector/conf"
+ metric_truststore_ca_certs = 'ca.pem'
+ ca_certs = os.path.join(ams_collector_conf_dir,
+ metric_truststore_ca_certs)
+ metric_collector_https_enabled = str(configurations[AMS_HTTP_POLICY]) == "HTTPS_ONLY"
+
+ try:
+ conn = network.get_http_connection(
+ collector_host,
+ int(collector_port),
+ metric_collector_https_enabled,
+ ca_certs,
+ ssl_version=AmbariConfig.get_resolved_config().get_force_https_protocol_value()
+ )
+ conn.request("GET", AMS_METRICS_GET_URL % encoded_get_metrics_parameters)
+ response = conn.getresponse()
+ data = response.read()
+ logger.info("Data read from metric anomaly endpoint")
+ logger.info(data)
+ conn.close()
+ except Exception:
+ return (RESULT_STATE_UNKNOWN, ["Unable to retrieve anomaly metrics from the Ambari Metrics service."])
+
+ if response.status != 200:
+ return (RESULT_STATE_UNKNOWN, ["Unable to retrieve anomaly metrics from the Ambari Metrics service."])
+
+ data_json = json.loads(data)
+ length = len(data_json["metrics"])
+ logger.info("Number of anomalies returned : {0}".format(length))
+
+ if length == 0:
+ alert_state = RESULT_STATE_OK
+ alert_label = 'No trend anomalies in the last {0} minutes.'.format(interval)
+ logger.info(alert_label)
+ elif length <= 5:
+ alert_state = RESULT_STATE_WARNING
+ alert_label = "http://avijayan-ad-1.openstacklocal:3000/dashboard/script/scripted.js?anomalies=" + data
+ else:
+ alert_state = RESULT_STATE_CRITICAL
+ alert_label = "http://avijayan-ad-1.openstacklocal:3000/dashboard/script/scripted.js?anomalies=" + data
+
+ return (alert_state, [alert_label])
+
+
+def valid_collector_webapp_address(webapp_address):
+ if len(webapp_address) == 2 \
+ and webapp_address[0] != '127.0.0.1' \
+ and webapp_address[1].isdigit():
+ return True
+
+ return False
+
+
+def _coerce_to_integer(value):
+ """
+ Attempts to correctly coerce a value to an integer. For the case of an integer or a float,
+ this will essentially either NOOP or return a truncated value. If the parameter is a string,
+ then it will first attempt to be coerced from a integer, and failing that, a float.
+ :param value: the value to coerce
+ :return: the coerced value as an integer
+ """
+ try:
+ return int(value)
+ except ValueError:
+ return int(float(value))
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/alerts/alert_metrics_deviation.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/alerts/alert_metrics_deviation.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/alerts/alert_metrics_deviation.py
index bc2102a..7546de0 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/alerts/alert_metrics_deviation.py
+++ b/ambari-server/src/main/resources/common-services/HDFS/3.0.0.3.0/package/alerts/alert_metrics_deviation.py
@@ -325,10 +325,12 @@ def execute(configurations={}, parameters={}, host_name=None):
response = conn.getresponse()
data = response.read()
conn.close()
- except Exception:
+ except Exception, e:
+ logger.info(str(e))
return (RESULT_STATE_UNKNOWN, ["Unable to retrieve metrics from the Ambari Metrics service."])
if response.status != 200:
+ logger.info(str(data))
return (RESULT_STATE_UNKNOWN, ["Unable to retrieve metrics from the Ambari Metrics service."])
data_json = json.loads(data)
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
index 785b36b..6bd2f4b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
@@ -39,6 +39,7 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
+ timelineMetric.setStartTime(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 1000, 1.0d);
inputValues.put(now - 2000, 2.0d);
@@ -66,6 +67,7 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
+ timelineMetric.setStartTime(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 1000, 1.0d);
inputValues.put(now - 2000, 2.0d);
@@ -93,6 +95,7 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
+ timelineMetric.setStartTime(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now, 0.0d);
inputValues.put(now - 1000, 1.0d);
@@ -120,6 +123,7 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
+ timelineMetric.setStartTime(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 1000, 1.0d);
timelineMetric.setMetricValues(inputValues);
@@ -145,6 +149,7 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
+ timelineMetric.setStartTime(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 1000, 1.0d);
timelineMetric.setMetricValues(inputValues);
@@ -168,6 +173,7 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
+ timelineMetric.setStartTime(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
long seconds = 1000;
@@ -228,6 +234,7 @@ public class MetricsPaddingMethodTest {
timelineMetric.setMetricName("m1");
timelineMetric.setHostName("h1");
timelineMetric.setAppId("a1");
+ timelineMetric.setStartTime(now);
TreeMap<Long, Double> inputValues = new TreeMap<>();
inputValues.put(now - 100, 1.0d);
inputValues.put(now - 200, 2.0d);
[2/2] ambari git commit: AMBARI-21106 : ML-Prototype: Detect
timeseries anomaly for a metric. (Refine PIT & Trend subsystems,
Integrate with AMS, Ambari Alerts.)
Posted by av...@apache.org.
AMBARI-21106 : ML-Prototype: Detect timeseries anomaly for a metric. (Refine PIT & Trend subsystems, Integrate with AMS, Ambari Alerts.)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a11d1033
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a11d1033
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a11d1033
Branch: refs/heads/branch-3.0-ams
Commit: a11d1033d07a298a15444de2985e0f2150c83626
Parents: 63e7435
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Tue Sep 26 14:38:40 2017 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Tue Sep 26 14:38:40 2017 -0700
----------------------------------------------------------------------
.../prototype/AmbariServerInterface.java | 1 -
.../prototype/MetricSparkConsumer.java | 113 ++++++++---
.../prototype/MetricsCollectorInterface.java | 10 +-
.../prototype/PointInTimeADSystem.java | 18 +-
.../alertservice/prototype/TrendADSystem.java | 26 +--
.../prototype/methods/ema/EmaModel.java | 31 ++--
.../prototype/methods/ema/EmaTechnique.java | 21 ++-
.../prototype/methods/hsdev/HsdevTechnique.java | 26 +--
.../src/main/resources/R-scripts/tukeys.r | 17 +-
.../src/main/resources/input-config.properties | 24 +++
.../prototype/TestEmaTechnique.java | 22 ++-
.../alertservice/prototype/TestTukeys.java | 1 -
.../ambari-metrics-grafana/src/main/scripted.js | 118 ++++++++++++
.../metrics/TestMetricSeriesGenerator.java | 87 +++++++++
.../timeline/HBaseTimelineMetricsService.java | 19 +-
.../metrics/timeline/PhoenixHBaseAccessor.java | 122 +++++++++++-
.../timeline/TimelineMetricConfiguration.java | 7 +
.../metrics/timeline/TimelineMetricStore.java | 2 +
.../timeline/query/PhoenixTransactSQL.java | 94 ++++++++++
.../MetricAnomalyDetectorTestService.java | 87 +++++++++
.../webapp/TimelineWebServices.java | 36 +++-
.../timeline/TestTimelineMetricStore.java | 5 +
.../AMBARI_METRICS/0.1.0/alerts.json | 70 +++++++
.../alert_point_in_time_metric_anomalies.py | 185 +++++++++++++++++++
.../alerts/alert_trend_metric_anomalies.py | 185 +++++++++++++++++++
.../package/alerts/alert_metrics_deviation.py | 4 +-
.../timeline/MetricsPaddingMethodTest.java | 7 +
27 files changed, 1234 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
index 0c1c6fc..b98f04c 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/AmbariServerInterface.java
@@ -76,7 +76,6 @@ public class AmbariServerInterface implements Serializable{
JSONArray array = jsonObject.getJSONArray("items");
for(int i = 0 ; i < array.length() ; i++){
JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition");
- LOG.info("alertDefn : " + alertDefn.get("name"));
if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) {
JSONObject sourceNode = alertDefn.getJSONObject("source");
JSONArray params = sourceNode.getJSONArray("parameters");
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
index 7735d6c..61b3dee 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricSparkConsumer.java
@@ -37,6 +37,12 @@ import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class MetricSparkConsumer {
@@ -47,38 +53,75 @@ public class MetricSparkConsumer {
private static long pitStartTime = System.currentTimeMillis();
private static long ksStartTime = pitStartTime;
private static long hdevStartTime = ksStartTime;
+ private static Set<Pattern> includeMetricPatterns = new HashSet<>();
+ private static Set<String> includedHosts = new HashSet<>();
+ private static Set<TrendMetric> trendMetrics = new HashSet<>();
public MetricSparkConsumer() {
}
+ public static Properties readProperties(String propertiesFile) {
+ try {
+ Properties properties = new Properties();
+ InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile);
+ if (inputStream == null) {
+ inputStream = new FileInputStream(propertiesFile);
+ }
+ properties.load(inputStream);
+ return properties;
+ } catch (IOException ioEx) {
+ LOG.error("Error reading properties file for jmeter");
+ return null;
+ }
+ }
+
public static void main(String[] args) throws InterruptedException {
- if (args.length < 5) {
- System.err.println("Usage: MetricSparkConsumer <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
+ if (args.length < 1) {
+ System.err.println("Usage: MetricSparkConsumer <input-config-file>");
System.exit(1);
}
- List<String> appIds = Arrays.asList(args[0].split(","));
- String collectorHost = args[1];
- String collectorPort = args[2];
- String collectorProtocol = args[3];
- String zkQuorum = args[4];
+ Properties properties = readProperties(args[0]);
+
+ List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(","));
+
+ String collectorHost = properties.getProperty("collectorHost");
+ String collectorPort = properties.getProperty("collectorPort");
+ String collectorProtocol = properties.getProperty("collectorProtocol");
- double emaW = StringUtils.isNotEmpty(args[5]) ? Double.parseDouble(args[5]) : 0.5;
- double emaN = StringUtils.isNotEmpty(args[8]) ? Double.parseDouble(args[6]) : 3;
- double tukeysN = StringUtils.isNotEmpty(args[7]) ? Double.parseDouble(args[7]) : 3;
+ String zkQuorum = properties.getProperty("zkQuorum");
- long pitTestInterval = StringUtils.isNotEmpty(args[8]) ? Long.parseLong(args[8]) : 5 * 60 * 1000;
- long pitTrainInterval = StringUtils.isNotEmpty(args[9]) ? Long.parseLong(args[9]) : 15 * 60 * 1000;
+ double emaW = Double.parseDouble(properties.getProperty("emaW"));
+ double emaN = Double.parseDouble(properties.getProperty("emaN"));
+ int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold"));
+ double tukeysN = Double.parseDouble(properties.getProperty("tukeysN"));
- String fileName = args[10];
- long ksTestInterval = StringUtils.isNotEmpty(args[11]) ? Long.parseLong(args[11]) : 10 * 60 * 1000;
- long ksTrainInterval = StringUtils.isNotEmpty(args[12]) ? Long.parseLong(args[12]) : 10 * 60 * 1000;
- int hsdevNhp = StringUtils.isNotEmpty(args[13]) ? Integer.parseInt(args[13]) : 3;
- long hsdevInterval = StringUtils.isNotEmpty(args[14]) ? Long.parseLong(args[14]) : 30 * 60 * 1000;
+ long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval"));
+ long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval"));
- String ambariServerHost = args[15];
- String clusterName = args[16];
+ long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval"));
+ long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval"));
+ int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp"));
+ long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval"));
+
+ String ambariServerHost = properties.getProperty("ambariServerHost");
+ String clusterName = properties.getProperty("clusterName");
+
+ String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns");
+ if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) {
+ String[] patterns = includeMetricPatternStrings.split(",");
+ for (String p : patterns) {
+ LOG.info("Included Pattern : " + p);
+ includeMetricPatterns.add(Pattern.compile(p));
+ }
+ }
+
+ String includedHostList = properties.getProperty("hosts");
+ if (includedHostList != null && !includedHostList.isEmpty()) {
+ String[] hosts = includedHostList.split(",");
+ includedHosts.addAll(Arrays.asList(hosts));
+ }
MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
@@ -86,7 +129,7 @@ public class MetricSparkConsumer {
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
- EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN);
+ EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold);
PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface,
tukeysN,
pitTestInterval,
@@ -97,13 +140,14 @@ public class MetricSparkConsumer {
TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
ksTestInterval,
ksTrainInterval,
- hsdevNhp,
- fileName);
+ hsdevNhp);
Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique);
Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem);
Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem);
Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface);
+ Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns);
+ Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts);
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads));
@@ -150,7 +194,7 @@ public class MetricSparkConsumer {
if (currentTime > ksStartTime + ksTestInterval) {
LOG.info("Running KS Test....");
- trendADSystemBroadcast.getValue().runKSTest(currentTime);
+ trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics);
ksStartTime = ksStartTime + ksTestInterval;
}
@@ -162,8 +206,27 @@ public class MetricSparkConsumer {
TimelineMetrics metrics = tuple2._2();
for (TimelineMetric timelineMetric : metrics.getMetrics()) {
- List<MetricAnomaly> anomalies = ema.test(timelineMetric);
- metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+
+ boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
+ boolean includeMetric = false;
+ if (includeHost) {
+ if (includePatternBroadcast.getValue().isEmpty()) {
+ includeMetric = true;
+ }
+ for (Pattern p : includePatternBroadcast.getValue()) {
+ Matcher m = p.matcher(timelineMetric.getMetricName());
+ if (m.find()) {
+ includeMetric = true;
+ }
+ }
+ }
+
+ if (includeMetric) {
+ trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(),
+ timelineMetric.getHostName()));
+ List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+ metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+ }
}
});
});
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
index 7b3f63d..dab4a0a 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/MetricsCollectorInterface.java
@@ -96,7 +96,7 @@ public class MetricsCollectorInterface implements Serializable {
emitMetrics(timelineMetrics);
}
} else {
- LOG.info("No anomalies to send.");
+ LOG.debug("No anomalies to send.");
}
}
@@ -130,7 +130,7 @@ public class MetricsCollectorInterface implements Serializable {
public boolean emitMetrics(TimelineMetrics metrics) {
String connectUrl = constructTimelineMetricUri();
String jsonData = null;
- LOG.info("EmitMetrics connectUrl = " + connectUrl);
+ LOG.debug("EmitMetrics connectUrl = " + connectUrl);
try {
jsonData = mapper.writeValueAsString(metrics);
LOG.info(jsonData);
@@ -202,7 +202,7 @@ public class MetricsCollectorInterface implements Serializable {
String url = constructTimelineMetricUri() + "?metricNames=" + metricName + "&appId=" + appId +
"&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + endtime;
- LOG.info("Fetch metrics URL : " + url);
+ LOG.debug("Fetch metrics URL : " + url);
URL obj = null;
BufferedReader in = null;
@@ -213,8 +213,8 @@ public class MetricsCollectorInterface implements Serializable {
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
con.setRequestMethod("GET");
int responseCode = con.getResponseCode();
- LOG.info("Sending 'GET' request to URL : " + url);
- LOG.info("Response Code : " + responseCode);
+ LOG.debug("Sending 'GET' request to URL : " + url);
+ LOG.debug("Response Code : " + responseCode);
in = new BufferedReader(
new InputStreamReader(con.getInputStream()));
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
index b4a8593..b3e7bd3 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/PointInTimeADSystem.java
@@ -49,7 +49,7 @@ public class PointInTimeADSystem implements Serializable {
private AmbariServerInterface ambariServerInterface;
private int sensitivity = 50;
private int minSensitivity = 0;
- private int maxSensitivity = 10;
+ private int maxSensitivity = 100;
public PointInTimeADSystem(MetricsCollectorInterface metricsCollectorInterface, double defaultTukeysN,
long testIntervalMillis, long trainIntervalMillis, String ambariServerHost, String clusterName) {
@@ -73,13 +73,13 @@ public class PointInTimeADSystem implements Serializable {
if (requiredSensivity > sensitivity) {
int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
while (sensitivity < targetSensitivity) {
- defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.1;
+ defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05;
sensitivity++;
}
} else {
int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
while (sensitivity > targetSensitivity) {
- defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.1;
+ defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05;
sensitivity--;
}
}
@@ -201,10 +201,10 @@ public class PointInTimeADSystem implements Serializable {
if (recall < 0.5) {
LOG.info("Increasing EMA sensitivity by 10%");
- emaModel.updateModel(true, 10);
+ emaModel.updateModel(true, 5);
} else if (precision < 0.5) {
LOG.info("Decreasing EMA sensitivity by 10%");
- emaModel.updateModel(false, 10);
+ emaModel.updateModel(false, 5);
}
}
@@ -233,7 +233,7 @@ public class PointInTimeADSystem implements Serializable {
double[] anomalyScore = result.resultset.get(2);
for (int i = 0; i < ts.length; i++) {
TimelineMetric timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(metricName + "_" + appId + "_" + hostname);
+ timelineMetric.setMetricName(metricName + ":" + appId + ":" + hostname);
timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-tukeys");
timelineMetric.setInstanceId(null);
@@ -243,7 +243,11 @@ public class PointInTimeADSystem implements Serializable {
HashMap<String, String> metadata = new HashMap<>();
metadata.put("method", "tukeys");
- metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+ if (String.valueOf(anomalyScore[i]).equals("infinity")) {
+ LOG.info("Got anomalyScore = infinity for " + metricName + ":" + appId + ":" + hostname);
+ } else {
+ metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
+ }
timelineMetric.setMetadata(metadata);
timelineMetric.setMetricValues(metricValues);
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
index 1534b55..df36a4a 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/TrendADSystem.java
@@ -31,11 +31,11 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
public class TrendADSystem implements Serializable {
@@ -57,8 +57,7 @@ public class TrendADSystem implements Serializable {
public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
long ksTestIntervalMillis,
long ksTrainIntervalMillis,
- int hsdevNumHistoricalPeriods,
- String inputFileName) {
+ int hsdevNumHistoricalPeriods) {
this.metricsCollectorInterface = metricsCollectorInterface;
this.ksTestIntervalMillis = ksTestIntervalMillis;
@@ -69,11 +68,9 @@ public class TrendADSystem implements Serializable {
this.hsdevTechnique = new HsdevTechnique();
trendMetrics = new ArrayList<>();
- this.inputFile = inputFileName;
- readInputFile(inputFileName);
}
- public void runKSTest(long currentEndTime) {
+ public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) {
readInputFile(inputFile);
long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
@@ -85,7 +82,7 @@ public class TrendADSystem implements Serializable {
String metricName = metric.metricName;
String appId = metric.appId;
String hostname = metric.hostname;
- String key = metricName + "_" + appId + "_" + hostname;
+ String key = metricName + ":" + appId + ":" + hostname;
TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis,
currentEndTime);
@@ -112,6 +109,7 @@ public class TrendADSystem implements Serializable {
}
}
+ LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size());
if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) {
LOG.info("Not enough train/test data to perform KS analysis.");
continue;
@@ -184,6 +182,7 @@ public class TrendADSystem implements Serializable {
return timelineMetric;
}
+
public void runHsdevMethod() {
List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
@@ -315,17 +314,4 @@ public class TrendADSystem implements Serializable {
this.hostname = hostname;
}
}
-
- /*
- boolean isPresent = false;
- for (TrendMetric trendMetric : trendMetrics) {
- if (trendMetric.metricName.equalsIgnoreCase(splits[0])) {
- isPresent = true;
- }
- }
- if (!isPresent) {
- LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]);
- trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
- }
- */
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
index 5e1f76b..a31410d 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
@@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
+import static org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold;
+
@XmlRootElement
public class EmaModel implements Serializable {
@@ -35,7 +37,6 @@ public class EmaModel implements Serializable {
private double timessdev;
private int ctr = 0;
- private static final int suppressAnomaliesTheshold = 30;
private static final Log LOG = LogFactory.getLog(EmaModel.class);
@@ -64,30 +65,36 @@ public class EmaModel implements Serializable {
public double testAndUpdate(double metricValue) {
double anomalyScore = 0.0;
+ LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev);
+ update(metricValue);
if (ctr > suppressAnomaliesTheshold) {
anomalyScore = test(metricValue);
- }
- if (Math.abs(anomalyScore) < 2 * timessdev) {
- update(metricValue);
+ if (anomalyScore > 0.0) {
+ LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+ ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+ } else {
+ LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems +
+ ", timessdev = " + timessdev + ", metricValue = " + metricValue);
+ }
} else {
- LOG.info("Not updating model for this value");
+ ctr++;
+ if (ctr > suppressAnomaliesTheshold) {
+ LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data.");
+ }
}
- ctr++;
- LOG.info("Counter : " + ctr);
- LOG.info("Anomaly Score for " + metricValue + " : " + anomalyScore);
return anomalyScore;
}
public void update(double metricValue) {
ema = weight * ema + (1 - weight) * metricValue;
ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0));
- LOG.info("In update : ema = " + ema + ", ems = " + ems);
+ LOG.debug("In update : ema = " + ema + ", ems = " + ems);
}
public double test(double metricValue) {
- LOG.info("In test : ema = " + ema + ", ems = " + ems);
+ LOG.debug("In test : ema = " + ema + ", ems = " + ems);
double diff = Math.abs(ema - metricValue) - (timessdev * ems);
- LOG.info("diff = " + diff);
+ LOG.debug("diff = " + diff);
if (diff > 0) {
return Math.abs((metricValue - ema) / ems); //Z score
} else {
@@ -102,7 +109,7 @@ public class EmaModel implements Serializable {
delta = delta * -1;
}
this.timessdev = timessdev + delta * timessdev;
- this.weight = Math.min(1.0, weight + delta * weight);
+ //this.weight = Math.min(1.0, weight + delta * weight);
LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
index c005e6f..52c6cf3 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
@@ -49,6 +49,15 @@ public class EmaTechnique extends AnomalyDetectionTechnique implements Serializa
private double startingWeight = 0.5;
private double startTimesSdev = 3.0;
private String methodType = "ema";
+ public static int suppressAnomaliesTheshold = 100;
+
+ public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) {
+ trackedEmas = new HashMap<>();
+ this.startingWeight = startingWeight;
+ this.startTimesSdev = startTimesSdev;
+ EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold;
+ LOG.info("New EmaTechnique......");
+ }
public EmaTechnique(double startingWeight, double startTimesSdev) {
trackedEmas = new HashMap<>();
@@ -61,16 +70,16 @@ public class EmaTechnique extends AnomalyDetectionTechnique implements Serializa
String metricName = metric.getMetricName();
String appId = metric.getAppId();
String hostname = metric.getHostName();
- String key = metricName + "_" + appId + "_" + hostname;
+ String key = metricName + ":" + appId + ":" + hostname;
EmaModel emaModel = trackedEmas.get(key);
if (emaModel == null) {
- LOG.info("EmaModel not present for " + key);
- LOG.info("Number of tracked Emas : " + trackedEmas.size());
+ LOG.debug("EmaModel not present for " + key);
+ LOG.debug("Number of tracked Emas : " + trackedEmas.size());
emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev);
trackedEmas.put(key, emaModel);
} else {
- LOG.info("EmaModel already present for " + key);
+ LOG.debug("EmaModel already present for " + key);
}
List<MetricAnomaly> anomalies = new ArrayList<>();
@@ -79,11 +88,11 @@ public class EmaTechnique extends AnomalyDetectionTechnique implements Serializa
double metricValue = metric.getMetricValues().get(timestamp);
double anomalyScore = emaModel.testAndUpdate(metricValue);
if (anomalyScore > 0.0) {
- LOG.info("Found anomaly for : " + key);
+ LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore);
MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore);
anomalies.add(metricAnomaly);
} else {
- LOG.info("Discarding non-anomaly for : " + key);
+ LOG.debug("Discarding non-anomaly for : " + key);
}
}
return anomalies;
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
index 50bf9f2..04f4a73 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
@@ -58,19 +58,23 @@ public class HsdevTechnique implements Serializable {
double historicMedian = median(trainData.values);
double currentMedian = median(testData.values);
- double diff = Math.abs(currentMedian - historicMedian);
- LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
- LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
- if (diff > n * historicSd) {
- double zScore = diff / historicSd;
- LOG.info("Z Score of current series : " + zScore);
- return new MetricAnomaly(key,
- (long) testData.ts[testLength - 1],
- testData.values[testLength - 1],
- methodType,
- zScore);
+ if (historicSd > 0) {
+ double diff = Math.abs(currentMedian - historicMedian);
+ LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1]));
+ LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd);
+
+ if (diff > n * historicSd) {
+ double zScore = diff / historicSd;
+ LOG.info("Z Score of current series : " + zScore);
+ return new MetricAnomaly(key,
+ (long) testData.ts[testLength - 1],
+ testData.values[testLength - 1],
+ methodType,
+ zScore);
+ }
}
+
return null;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
index f33b6ec..0312226 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
@@ -26,20 +26,23 @@ ams_tukeys <- function(train_data, test_data, n) {
anomalies <- data.frame()
quantiles <- quantile(train_data[,2])
iqr <- quantiles[4] - quantiles[2]
+ niqr <- 0
for ( i in 1:length(test_data[,1])) {
x <- test_data[i,2]
lb <- quantiles[2] - n*iqr
ub <- quantiles[4] + n*iqr
if ( (x < lb) || (x > ub) ) {
- if (x < lb) {
- niqr <- (quantiles[2] - x) / iqr
- } else {
- niqr <- (x - quantiles[4]) / iqr
+ if (iqr != 0) {
+ if (x < lb) {
+ niqr <- (quantiles[2] - x) / iqr
+ } else {
+ niqr <- (x - quantiles[4]) / iqr
+ }
+ }
+ anomaly <- c(test_data[i,1], x, niqr)
+ anomalies <- rbind(anomalies, anomaly)
}
- anomaly <- c(test_data[i,1], x, niqr)
- anomalies <- rbind(anomalies, anomaly)
- }
}
if(length(anomalies) > 0) {
names(anomalies) <- c("TS", "Value", "niqr")
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties
new file mode 100644
index 0000000..88304c7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties
@@ -0,0 +1,24 @@
+appIds=HOST
+
+collectorHost=localhost
+collectorPort=6188
+collectorProtocol=http
+
+zkQuorum=localhost:2181
+
+ambariServerHost=localhost
+clusterName=c1
+
+emaW=0.8
+emaN=3
+tukeysN=3
+pointInTimeTestInterval=300000
+pointInTimeTrainInterval=900000
+
+ksTestInterval=600000
+ksTrainInterval=600000
+hsdevNhp=3
+hsdevInterval=1800000;
+
+skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime
+hosts=avijayan-ad-1.openstacklocal
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
index 539ca40..d1e2b41 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
@@ -21,21 +21,41 @@ import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.util.List;
import java.util.TreeMap;
+import static org.apache.ambari.metrics.alertservice.prototype.TestRFunctionInvoker.getTS;
+
public class TestEmaTechnique {
+ private static double[] ts;
+ private static String fullFilePath;
+
+ @BeforeClass
+ public static void init() throws URISyntaxException {
+
+ Assume.assumeTrue(System.getenv("R_HOME") != null);
+ ts = getTS(1000);
+ URL url = ClassLoader.getSystemResource("R-scripts");
+ fullFilePath = new File(url.toURI()).getAbsolutePath();
+ RFunctionInvoker.setScriptsDir(fullFilePath);
+ }
+
@Test
public void testEmaInitialization() {
EmaTechnique ema = new EmaTechnique(0.5, 3);
Assert.assertTrue(ema.getTrackedEmas().isEmpty());
Assert.assertTrue(ema.getStartingWeight() == 0.5);
- Assert.assertTrue(ema.getStartTimesSdev() == 3);
+ Assert.assertTrue(ema.getStartTimesSdev() == 2);
}
@Test
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
index bb409cf..ef0125f 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
@@ -21,7 +21,6 @@ import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-grafana/src/main/scripted.js
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-grafana/src/main/scripted.js b/ambari-metrics/ambari-metrics-grafana/src/main/scripted.js
new file mode 100644
index 0000000..298535f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-grafana/src/main/scripted.js
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* global _ */
+
+/*
+ * Complex scripted dashboard
+ * This script generates a dashboard object that Grafana can load. It also takes a number of user
+ * supplied URL parameters (in the ARGS variable)
+ *
+ * Return a dashboard object, or a function
+ *
+ * For async scripts, return a function, this function must take a single callback function as argument,
+ * call this callback function with the dashboard object (look at scripted_async.js for an example)
+ */
+
+'use strict';
+
+// accessible variables in this scope
+var window, document, ARGS, $, jQuery, moment, kbn;
+
+// Setup some variables
+var dashboard;
+
+// All url parameters are available via the ARGS object
+var ARGS;
+
+// Intialize a skeleton with nothing but a rows array and service object
+dashboard = {
+ rows : [],
+};
+
+// Set a title
+dashboard.title = 'Scripted dash';
+
+// Set default time
+// time can be overriden in the url using from/to parameters, but this is
+// handled automatically in grafana core during dashboard initialization
+
+
+var obj = JSON.parse(ARGS.anomalies);
+var metrics = obj.metrics;
+var rows = metrics.length
+
+dashboard.time = {
+ from: "now-1h",
+ to: "now"
+};
+
+var metricSet = new Set();
+
+for (var i = 0; i < rows; i++) {
+
+ var key = metrics[i].metricname;
+ if (metricSet.has(key)) {
+ continue;
+ }
+ metricSet.add(key)
+ var metricKeyElements = key.split(":");
+ var metricName = metricKeyElements[0];
+ var appId = metricKeyElements[1];
+ var hostname = metricKeyElements[2];
+
+ dashboard.rows.push({
+ title: 'Chart',
+ height: '300px',
+ panels: [
+ {
+ title: metricName,
+ type: 'graph',
+ span: 12,
+ fill: 1,
+ linewidth: 2,
+ targets: [
+ {
+ "aggregator": "none",
+ "alias": metricName,
+ "app": appId,
+ "errors": {},
+ "metric": metricName,
+ "precision": "default",
+ "refId": "A",
+ "hosts": hostname
+ }
+ ],
+ seriesOverrides: [
+ {
+ alias: '/random/',
+ yaxis: 2,
+ fill: 0,
+ linewidth: 5
+ }
+ ],
+ tooltip: {
+ shared: true
+ }
+ }
+ ]
+ });
+}
+
+
+return dashboard;
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java
new file mode 100644
index 0000000..2420ef3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/TestMetricSeriesGenerator.java
@@ -0,0 +1,87 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics;
+
+import org.apache.ambari.metrics.alertservice.prototype.TestSeriesInputRequest;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.AbstractMetricSeries;
+import org.apache.ambari.metrics.alertservice.seriesgenerator.MetricSeriesGeneratorFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TestMetricSeriesGenerator implements Runnable {
+
+ private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>();
+ private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class);
+ private TimelineMetricStore metricStore;
+ private String hostname;
+
+ public TestMetricSeriesGenerator(TimelineMetricStore metricStore) {
+ this.metricStore = metricStore;
+ try {
+ this.hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void addSeries(TestSeriesInputRequest inputRequest) {
+ if (!configuredSeries.containsKey(inputRequest)) {
+ AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs());
+ configuredSeries.put(inputRequest, metricSeries);
+ LOG.info("Added series " + inputRequest.getSeriesName());
+ }
+ }
+
+ public void removeSeries(String seriesName) {
+ boolean isPresent = false;
+ TestSeriesInputRequest tbd = null;
+ for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) {
+ if (inputRequest.getSeriesName().equals(seriesName)) {
+ isPresent = true;
+ tbd = inputRequest;
+ }
+ }
+ if (isPresent) {
+ LOG.info("Removing series " + seriesName);
+ configuredSeries.remove(tbd);
+ } else {
+ LOG.info("Series not found : " + seriesName);
+ }
+ }
+
+ @Override
+ public void run() {
+ long currentTime = System.currentTimeMillis();
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+ for (TestSeriesInputRequest input : configuredSeries.keySet()) {
+ AbstractMetricSeries metricSeries = configuredSeries.get(input);
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(input.getSeriesName());
+ timelineMetric.setAppId("anomaly-engine-test-metric");
+ timelineMetric.setInstanceId(null);
+ timelineMetric.setStartTime(currentTime);
+ timelineMetric.setHostName(hostname);
+ TreeMap<Long, Double> metricValues = new TreeMap();
+ metricValues.put(currentTime, metricSeries.nextValue());
+ timelineMetric.setMetricValues(metricValues);
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ LOG.info("Emitting metric with appId = " + timelineMetric.getAppId());
+ }
+ try {
+ LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series.");
+ metricStore.putMetrics(timelineMetrics);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index d83902f..a4b0bcc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -86,6 +86,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
private TimelineMetricMetadataManager metricMetadataManager;
private Integer defaultTopNHostsLimit;
private MetricCollectorHAController haController;
+// private MetricKafkaProducer metricKafkaProducer;
/**
* Construct the service.
@@ -155,6 +156,10 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
"start cache node", e);
}
}
+// String kafkaServers = configuration.getKafkaServers();
+// if (kafkaServers != null) {
+// metricKafkaProducer = new MetricKafkaProducer(kafkaServers);
+// }
defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
@@ -233,6 +238,11 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
}
@Override
+ public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException {
+ return hBaseAccessor.getAnomalyMetricRecords(method, startTime, endTime, limit);
+ }
+
+ @Override
public TimelineMetrics getTimelineMetrics(List<String> metricNames,
List<String> hostnames, String applicationId, String instanceId,
Long startTime, Long endTime, Precision precision, Integer limit,
@@ -397,10 +407,17 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
cache.putMetrics(metrics.getMetrics(), metricMetadataManager);
}
+// try {
+// metricKafkaProducer.sendMetrics(metrics);
+//// if (metrics.getMetrics().size() != 0 && metrics.getMetrics().get(0).getAppId().equals("anomaly-engine-test-metric")) {
+//// }
+// } catch (Exception e) {
+// LOG.error(e);
+// }
+
return response;
}
-
@Override
public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
throws SQLException, IOException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index be51dcf..685e638 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
@@ -35,7 +34,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
@@ -50,15 +48,18 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ANOMALY_METRICS_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_ANOMALY_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL;
@@ -73,7 +74,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.TREND_ANOMALY_METRICS_TABLE_NAME;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_ANOMALY_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
@@ -81,6 +84,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_TREND_ANOMALY_METRICS_SQL;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
import java.io.IOException;
@@ -309,11 +313,63 @@ public class PhoenixHBaseAccessor {
commitMetrics(Collections.singletonList(timelineMetrics));
}
+ private void commitAnomalyMetric(Connection conn, TimelineMetric metric) {
+ PreparedStatement metricRecordStmt = null;
+ try {
+
+ Map<String, String> metricMetadata = metric.getMetadata();
+
+
+ byte[] uuid = metadataManagerInstance.getUuid(metric);
+ if (uuid == null) {
+ LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
+ return;
+ }
+
+ if (metric.getAppId().equals("anomaly-engine-ks") || metric.getAppId().equals("anomaly-engine-hsdev")) {
+ metricRecordStmt = conn.prepareStatement(String.format(UPSERT_TREND_ANOMALY_METRICS_SQL,
+ TREND_ANOMALY_METRICS_TABLE_NAME));
+
+ metricRecordStmt.setBytes(1, uuid);
+ metricRecordStmt.setLong(2, metric.getStartTime());
+ metricRecordStmt.setLong(3, Long.parseLong(metricMetadata.get("test-start-time")));
+ metricRecordStmt.setLong(4, Long.parseLong(metricMetadata.get("train-start-time")));
+ metricRecordStmt.setLong(5, Long.parseLong(metricMetadata.get("train-end-time")));
+ String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+ metricRecordStmt.setString(6, json);
+ metricRecordStmt.setString(7, metric.getMetadata().get("method"));
+ double anomalyScore = metric.getMetadata().containsKey("anomaly-score") ? Double.parseDouble(metric.getMetadata().get("anomaly-score")) : 0.0;
+ metricRecordStmt.setDouble(8, anomalyScore);
+
+ } else {
+ metricRecordStmt = conn.prepareStatement(String.format(
+ UPSERT_ANOMALY_METRICS_SQL, ANOMALY_METRICS_TABLE_NAME));
+
+ metricRecordStmt.setBytes(1, uuid);
+ metricRecordStmt.setLong(2, metric.getStartTime());
+ String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+ metricRecordStmt.setString(3, json);
+ metricRecordStmt.setString(4, metric.getMetadata().get("method"));
+ double anomalyScore = metric.getMetadata().containsKey("anomaly-score") ? Double.parseDouble(metric.getMetadata().get("anomaly-score")) : 0.0;
+ metricRecordStmt.setDouble(5, anomalyScore);
+ }
+
+ try {
+ metricRecordStmt.executeUpdate();
+ } catch (SQLException sql) {
+ LOG.error("Failed on insert records to store.", sql);
+ }
+
+ } catch (Exception e) {
+ LOG.error("Failed on insert records to anomaly table.", e);
+ }
+
+ }
+
public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) {
LOG.debug("Committing metrics to store");
Connection conn = null;
PreparedStatement metricRecordStmt = null;
- long currentTime = System.currentTimeMillis();
try {
conn = getConnection();
@@ -321,6 +377,10 @@ public class PhoenixHBaseAccessor {
UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
for (TimelineMetrics timelineMetrics : timelineMetricsCollection) {
for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+ if (metric.getAppId().startsWith("anomaly-engine") && !metric.getAppId().equals("anomaly-engine-test-metric")) {
+ commitAnomalyMetric(conn, metric);
+ }
+
metricRecordStmt.clearParameters();
if (LOG.isTraceEnabled()) {
@@ -469,6 +529,20 @@ public class PhoenixHBaseAccessor {
stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
+ //Anomaly Metrics
+ stmt.executeUpdate(String.format(CREATE_ANOMALY_METRICS_TABLE_SQL,
+ ANOMALY_METRICS_TABLE_NAME,
+ encoding,
+ tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME),
+ compression));
+
+ //Trend Anomaly Metrics
+ stmt.executeUpdate(String.format(CREATE_TREND_ANOMALY_METRICS_TABLE_SQL,
+ TREND_ANOMALY_METRICS_TABLE_NAME,
+ encoding,
+ tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME),
+ compression));
+
// Host level
String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
@@ -842,6 +916,48 @@ public class PhoenixHBaseAccessor {
insertMetricRecords(metrics, false);
}
+ public TimelineMetrics getAnomalyMetricRecords(String method, long startTime, long endTime, Integer limit) throws SQLException {
+ Connection conn = getConnection();
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ TimelineMetrics metrics = new TimelineMetrics();
+ try {
+ stmt = PhoenixTransactSQL.prepareAnomalyMetricsGetSqlStatement(conn, method, startTime, endTime, limit);
+ rs = stmt.executeQuery();
+ while (rs.next()) {
+
+ byte[] uuid = rs.getBytes("UUID");
+ TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid);
+
+ if (method.equals("ks") || method.equals("hsdev")) {
+ metric.setStartTime(rs.getLong("TEST_END_TIME"));
+ } else {
+ metric.setStartTime(rs.getLong("SERVER_TIME"));
+ }
+ metric.setInstanceId(null);
+
+ HashMap<String, String> metadata = new HashMap<>();
+ metadata.put("method", rs.getString("METHOD"));
+ metadata.put("anomaly-score", String.valueOf(rs.getDouble("ANOMALY_SCORE")));
+ if (method.equals("ks") || method.equals("hsdev")) {
+ metadata.put("test-start-time", String.valueOf(rs.getLong("TEST_START_TIME")));
+ metadata.put("train-start-time", String.valueOf(rs.getLong("TRAIN_START_TIME")));
+ metadata.put("train-end-time", String.valueOf(rs.getLong("TRAIN_END_TIME")));
+ }
+ metric.setMetadata(metadata);
+
+ TreeMap<Long, Double> sortedByTimeMetrics = readMetricFromJSON(rs.getString("METRICS"));
+ metric.setMetricValues(sortedByTimeMetrics);
+
+ metrics.getMetrics().add(metric);
+ }
+ } catch (Exception ex) {
+ LOG.error(ex);
+ }
+ return metrics;
+ }
+
+
@SuppressWarnings("unchecked")
public TimelineMetrics getMetricRecords(
final Condition condition, Multimap<String, List<Function>> metricFunctions)
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index b85a9b6..c86137b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -510,6 +510,13 @@ public class TimelineMetricConfiguration {
return defaultRpcAddress;
}
+ public String getKafkaServers() {
+ if (metricsConf != null) {
+ return metricsConf.get("timeline.metrics.kafka.servers", null);
+ }
+ return null;
+ }
+
public boolean isDistributedCollectorModeDisabled() {
try {
if (getMetricsConf() != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index dab4494..cdeefdc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -107,4 +107,6 @@ public interface TimelineMetricStore {
* @return [ hostname ]
*/
List<String> getLiveInstances();
+
+ TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index fe1fb06..fd0cc72 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -37,6 +38,27 @@ public class PhoenixTransactSQL {
public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+ public static final String CREATE_ANOMALY_METRICS_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS %s " +
+ "(UUID BINARY(20) NOT NULL, " +
+ "SERVER_TIME UNSIGNED_LONG NOT NULL, " +
+ "METRICS VARCHAR, " +
+ "METHOD VARCHAR, " +
+ "ANOMALY_SCORE DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (UUID, SERVER_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'";
+
+ public static final String CREATE_TREND_ANOMALY_METRICS_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS %s " +
+ "(UUID BINARY(20) NOT NULL, " +
+ "TEST_START_TIME UNSIGNED_LONG NOT NULL, " +
+ "TEST_END_TIME UNSIGNED_LONG NOT NULL, " +
+ "TRAIN_START_TIME UNSIGNED_LONG, " +
+ "TRAIN_END_TIME UNSIGNED_LONG, " +
+ "METRICS VARCHAR, " +
+ "METHOD VARCHAR, " +
+ "ANOMALY_SCORE DOUBLE CONSTRAINT pk " +
+ "PRIMARY KEY (UUID, TEST_START_TIME, TEST_END_TIME)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'";
+
/**
* Create table to store individual metric records.
*/
@@ -146,6 +168,25 @@ public class PhoenixTransactSQL {
*/
public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s";
+ public static final String UPSERT_ANOMALY_METRICS_SQL = "UPSERT INTO %s " +
+ "(UUID, " +
+ "SERVER_TIME, " +
+ "METRICS, " +
+ "METHOD, " +
+ "ANOMALY_SCORE) VALUES " +
+ "(?, ?, ?, ?, ?)";
+
+ public static final String UPSERT_TREND_ANOMALY_METRICS_SQL = "UPSERT INTO %s " +
+ "(UUID, " +
+ "TEST_START_TIME, " +
+ "TEST_END_TIME, " +
+ "TRAIN_START_TIME, " +
+ "TRAIN_END_TIME, " +
+ "METRICS, " +
+ "METHOD, " +
+ "ANOMALY_SCORE) VALUES " +
+ "(?, ?, ?, ?, ?, ?, ?, ?)";
+
/**
* Insert into metric records table.
*/
@@ -221,6 +262,22 @@ public class PhoenixTransactSQL {
public static final String UPSERT_INSTANCE_HOST_METADATA_SQL =
"UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)";
+ public static final String GET_ANOMALY_METRIC_SQL = "SELECT UUID, SERVER_TIME, " +
+ "METRICS, " +
+ "METHOD, " +
+ "ANOMALY_SCORE " +
+ "FROM %s " +
+ "WHERE METHOD = ? AND SERVER_TIME > ? AND SERVER_TIME <= ? ORDER BY ANOMALY_SCORE DESC";
+
+ public static final String GET_TREND_ANOMALY_METRIC_SQL = "SELECT UUID, " +
+ "TEST_START_TIME, TEST_END_TIME, " +
+ "TRAIN_START_TIME, TRAIN_END_TIME, " +
+ "METRICS, " +
+ "METHOD, " +
+ "ANOMALY_SCORE " +
+ "FROM %s " +
+ "WHERE METHOD = ? AND TEST_END_TIME > ? AND TEST_END_TIME <= ? ORDER BY ANOMALY_SCORE DESC";
+
/**
* Retrieve a set of rows from metrics records table.
*/
@@ -333,6 +390,9 @@ public class PhoenixTransactSQL {
"%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME > %s AND SERVER_TIME <= %s " +
"GROUP BY UUID ORDER BY %s DESC LIMIT %s";
+ public static final String ANOMALY_METRICS_TABLE_NAME = "METRIC_ANOMALIES";
+ public static final String TREND_ANOMALY_METRICS_TABLE_NAME = "TREND_METRIC_ANOMALIES";
+
public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS";
@@ -395,6 +455,40 @@ public class PhoenixTransactSQL {
PhoenixTransactSQL.sortMergeJoinEnabled = sortMergeJoinEnabled;
}
+ public static PreparedStatement prepareAnomalyMetricsGetSqlStatement(Connection connection, String method,
+ long startTime, long endTime, Integer limit) throws SQLException {
+ StringBuilder sb = new StringBuilder();
+ if (method.equals("ema") || method.equals("tukeys")) {
+ sb.append(String.format(GET_ANOMALY_METRIC_SQL, ANOMALY_METRICS_TABLE_NAME));
+ } else {
+ sb.append(String.format(GET_TREND_ANOMALY_METRIC_SQL, TREND_ANOMALY_METRICS_TABLE_NAME));
+ }
+ if (limit != null) {
+ sb.append(" LIMIT " + limit);
+ }
+ PreparedStatement stmt = null;
+ try {
+ stmt = connection.prepareStatement(sb.toString());
+ int pos = 1;
+
+ stmt.setString(pos++, method);
+ stmt.setLong(pos++, startTime);
+ stmt.setLong(pos, endTime);
+ if (limit != null) {
+ stmt.setFetchSize(limit);
+ }
+
+ } catch (SQLException e) {
+ if (stmt != null) {
+ stmt.close();
+ }
+ throw e;
+ }
+
+ return stmt;
+ }
+
+
public static PreparedStatement prepareGetMetricsSqlStmt(Connection connection,
Condition condition) throws SQLException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java
new file mode 100644
index 0000000..6f7b14a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/MetricAnomalyDetectorTestService.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Singleton
+@Path("/ws/v1/metrictestservice")
+public class MetricAnomalyDetectorTestService {
+
+ private static final Log LOG = LogFactory.getLog(MetricAnomalyDetectorTestService.class);
+
+ @Inject
+ public MetricAnomalyDetectorTestService() {
+ }
+
+ private void init(HttpServletResponse response) {
+ response.setContentType(null);
+ }
+
+ @Path("/anomaly")
+ @POST
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelinePutResponse postAnomalyDetectionRequest(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ MetricAnomalyDetectorTestInput input) {
+
+ init(res);
+ if (input == null) {
+ return new TimelinePutResponse();
+ }
+
+ try {
+ return null;
+ } catch (Exception e) {
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ @GET
+ @Path("/dataseries")
+ @Produces({MediaType.APPLICATION_JSON})
+ public TimelineMetrics getTestDataSeries(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @QueryParam("type") String seriesType,
+ @QueryParam("configs") String config
+ ) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 472a787..20aba23 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -24,7 +24,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
@@ -37,6 +36,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.TestMetricSeriesGenerator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.EntityIdentifier;
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.webapp.BadRequestException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@@ -75,6 +76,10 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
@@ -389,7 +394,7 @@ public class TimelineWebServices {
}
return timelineMetricStore.getTimelineMetrics(
- parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, instanceId,
+ parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, parseStr(instanceId),
parseLongStr(startTime), parseLongStr(endTime),
Precision.getPrecision(precision), parseIntStr(limit),
parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN),
@@ -412,6 +417,25 @@ public class TimelineWebServices {
}
@GET
+ @Path("/metrics/anomalies")
+ @Produces({ MediaType.APPLICATION_JSON })
+ public TimelineMetrics getAnomalyMetrics(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ @QueryParam("method") String method,
+ @QueryParam("startTime") String startTime,
+ @QueryParam("endTime") String endTime,
+ @QueryParam("limit") String limit
+ ) {
+ init(res);
+
+ try {
+ return timelineMetricStore.getAnomalyMetrics(method, parseLongStr(startTime), parseLongStr(endTime), parseIntStr(limit));
+ } catch (Exception e) {
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+ @GET
@Path("/metrics/metadata")
@Produces({ MediaType.APPLICATION_JSON })
public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(
@@ -660,6 +684,12 @@ public class TimelineWebServices {
}
private static String parseStr(String str) {
- return str == null ? null : str.trim();
+ String trimmedInstance = (str == null) ? null : str.trim();
+ if (trimmedInstance != null) {
+ if (trimmedInstance.isEmpty() || trimmedInstance.equalsIgnoreCase("undefined")) {
+ trimmedInstance = null;
+ }
+ }
+ return trimmedInstance;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index 95a0e86..eb772bc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -119,6 +119,11 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
return null;
}
+ @Override
+ public TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) {
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a11d1033/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json
index e41adb5..acecb62 100644
--- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json
+++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/alerts.json
@@ -142,6 +142,76 @@
"value": "{0} * 100"
}
}
+ },
+ {
+ "name": "point_in_time_metrics_anomalies",
+ "label": "Point in Time metric anomalies",
+ "description": "This service-level alert if there are metric anomalies in the last 10 mins or configured interval.",
+ "interval": 10,
+ "scope": "ANY",
+ "enabled": true,
+ "source": {
+ "type": "SCRIPT",
+ "path": "AMBARI_METRICS/0.1.0/package/alerts/alert_point_in_time_metric_anomalies.py",
+ "parameters": [
+ {
+ "name": "num_anomalies",
+ "display_name": "Value of N in Top 'N' anomalies to be reported.",
+ "value": 5,
+ "type": "NUMERIC",
+ "description": "Report only this amount of anomalies."
+ },
+ {
+ "name": "interval",
+ "display_name": "Query Time interval in minutes",
+ "value": 10,
+ "type": "NUMERIC",
+ "description": "Query Time interval in minutes."
+ },
+ {
+ "name": "sensitivity",
+ "display_name": "Alert Sensitivity",
+ "value": 50,
+ "type": "NUMERIC",
+ "description": "Sensitivity of the alert. Scale of 1 - 100. Default = 50."
+ }
+ ]
+ }
+ },
+ {
+ "name": "trend_metrics_anomalies",
+ "label": "Trend metric anomalies",
+ "description": "This service-level alert if there are metric anomalies in the last 10 mins or configured interval.",
+ "interval": 10,
+ "scope": "ANY",
+ "enabled": true,
+ "source": {
+ "type": "SCRIPT",
+ "path": "AMBARI_METRICS/0.1.0/package/alerts/alert_trend_metric_anomalies.py",
+ "parameters": [
+ {
+ "name": "num_anomalies",
+ "display_name": "Value of N in Top 'N' anomalies to be reported.",
+ "value": 5,
+ "type": "NUMERIC",
+ "description": "Report only this amount of anomalies."
+ },
+ {
+ "name": "interval",
+ "display_name": "Query Time interval in minutes",
+ "value": 10,
+ "type": "NUMERIC",
+ "description": "Query Time interval in minutes."
+ },
+ {
+ "name": "sensitivity",
+ "display_name": "Alert Sensitivity",
+ "value": 50,
+ "type": "NUMERIC",
+ "description": "Sensitivity of the alert. Scale of 1 - 100. Default = 50."
+ }
+ ]
+ }
}
],
"METRICS_MONITOR": [