You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/04/01 19:14:00 UTC
[ambari] 04/39: AMBARI-21106 : Ambari Metrics Anomaly detection
prototype (Commit 3). (avijayan)
This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
commit b1ec22e38d60ffdda5cb7a65f9779da9de8b8a3c
Author: Aravindan Vijayan <av...@hortonworks.com>
AuthorDate: Tue May 30 13:35:54 2017 -0700
AMBARI-21106 : Ambari Metrics Anomaly detection prototype (Commit 3). (avijayan)
---
.../metrics/alertservice/R/RFunctionInvoker.java | 15 ++--
.../src/main/resources/R-scripts/ema.R | 79 ++++++++++++++++++++++
.../src/main/resources/R-scripts/hsdev.r | 60 ++++++++++++++++
.../src/main/resources/R-scripts/iforest.R | 35 ++++++++++
.../src/main/resources/R-scripts/kstest.r | 21 ++++++
.../src/main/resources/R-scripts/test.R | 67 ++++++++++++++++++
.../src/main/resources/R-scripts/tukeys.r | 26 +++++++
.../src/main/resources/R-scripts/util.R | 19 ++++++
8 files changed, 312 insertions(+), 10 deletions(-)
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
index 8d1e520..71ad66d 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
@@ -31,8 +31,7 @@ public class RFunctionInvoker {
public static ResultSet tukeys(DataSet trainData, DataSet testData, Map<String, String> configs) {
try {
- r.eval("library(ambarimetricsAD)");
- r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+ r.eval("source('tukeys.r', echo=TRUE)");
int n = Integer.parseInt(configs.get("tukeys.n"));
r.eval("n <- " + n);
@@ -57,8 +56,7 @@ public class RFunctionInvoker {
public static ResultSet ema_global(DataSet trainData, DataSet testData, Map<String, String> configs) {
try {
- r.eval("library(ambarimetricsAD)");
- r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+ r.eval("source('ema.R', echo=TRUE)");
int n = Integer.parseInt(configs.get("ema.n"));
r.eval("n <- " + n);
@@ -87,8 +85,7 @@ public class RFunctionInvoker {
public static ResultSet ema_daily(DataSet trainData, DataSet testData, Map<String, String> configs) {
try {
- r.eval("library(ambarimetricsAD)");
- r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+ r.eval("source('ema.R', echo=TRUE)");
int n = Integer.parseInt(configs.get("ema.n"));
r.eval("n <- " + n);
@@ -117,8 +114,7 @@ public class RFunctionInvoker {
public static ResultSet ksTest(DataSet trainData, DataSet testData, Map<String, String> configs) {
try {
- r.eval("library(ambarimetricsAD)");
- r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/kstest.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+ r.eval("source('kstest.r', echo=TRUE)");
double p_value = Double.parseDouble(configs.get("ks.p_value"));
r.eval("p_value <- " + p_value);
@@ -144,8 +140,7 @@ public class RFunctionInvoker {
public static ResultSet hsdev(DataSet trainData, DataSet testData, Map<String, String> configs) {
try {
- r.eval("library(ambarimetricsAD)");
- r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/hsdev.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+ r.eval("source('hsdev.r', echo=TRUE)");
int n = Integer.parseInt(configs.get("hsdev.n"));
r.eval("n <- " + n);
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R
new file mode 100644
index 0000000..d3188f0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R
@@ -0,0 +1,79 @@
+# EMA <- w * EMA + (1 - w) * x
+# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 )
+# Alarm = abs(x - EMA) > n * EMS
+
+ema_global <- function(train_data, test_data, w, n) {
+
+# res <- get_data(url)
+# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+# names(data) <- c("TS", res$metrics[[1]]$metricname)
+# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+ anomalies <- data.frame()
+ ema <- 0
+ ems <- 0
+
+ #Train Step
+ for (x in train_data) {
+ ema <- w*ema + (1-w)*x
+ ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+ }
+
+ for ( i in 1:length(test_data[,1])) {
+ x <- test_data[i,2]
+ if (abs(x - ema) > n*ems) {
+ anomaly <- c(as.numeric(test_data[i,1]), x)
+ # print (anomaly)
+ anomalies <- rbind(anomalies, anomaly)
+ }
+ ema <- w*ema + (1-w)*x
+ ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS", "Value")
+ }
+ return (anomalies)
+}
+
+ema_daily <- function(train_data, test_data, w, n) {
+
+# res <- get_data(url)
+# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+# names(data) <- c("TS", res$metrics[[1]]$metricname)
+# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ]
+# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+ anomalies <- data.frame()
+ ema <- vector("numeric", 7)
+ ems <- vector("numeric", 7)
+
+ #Train Step
+ for ( i in 1:length(train_data[,1])) {
+ x <- train_data[i,2]
+ time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+ index <- time$wday
+ ema[index] <- w*ema[index] + (1-w)*x
+ ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2)
+ }
+
+ for ( i in 1:length(test_data[,1])) {
+ x <- test_data[i,2]
+ time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+ index <- time$wday
+
+ if (abs(x - ema[index+1]) > n*ems[index+1]) {
+ anomaly <- c(as.numeric(test_data[i,1]), x)
+ # print (anomaly)
+ anomalies <- rbind(anomalies, anomaly)
+ }
+ ema[index+1] <- w*ema[index+1] + (1-w)*x
+ ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2)
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS", "Value")
+ }
+ return(anomalies)
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
new file mode 100644
index 0000000..ff8a8f7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
@@ -0,0 +1,60 @@
+hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) {
+
+ #res <- get_data(url)
+ #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+ #names(data) <- c("TS", res$metrics[[1]]$metricname)
+ anomalies <- data.frame()
+
+ granularity <- train_data[2,1] - train_data[1,1]
+ test_start <- test_data[1,1]
+ test_end <- test_data[length(test_data[1,]),1]
+ cat ("\n test_start : ", as.numeric(test_start))
+ train_start <- test_start - num_historic_periods*period
+ cat ("\n train_start : ", as.numeric(train_start))
+ # round to start of day
+ train_start <- train_start - (train_start %% interval)
+ cat ("\n train_start after rounding: ", as.numeric(train_start))
+
+ time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+ test_data_day <- time$wday
+
+ h_data <- c()
+ for ( i in length(train_data[,1]):1) {
+ ts <- train_data[i,1]
+ if ( ts < train_start) {
+ cat ("\n Breaking out of loop : ", ts)
+ break
+ }
+ time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT")
+ if (time$wday == test_data_day) {
+ x <- train_data[i,2]
+ h_data <- c(h_data, x)
+ }
+ }
+
+ cat ("\n Train data length : ", length(train_data[,1]))
+ cat ("\n Test data length : ", length(test_data[,1]))
+ cat ("\n Historic data length : ", length(h_data))
+ if (length(h_data) < 2*length(test_data[,1])) {
+ cat ("\nNot enough training data")
+ return (anomalies)
+ }
+
+ past_median <- median(h_data)
+ cat ("\npast_median : ", past_median)
+ past_sd <- sd(h_data)
+ cat ("\npast_sd : ", past_sd)
+ curr_median <- median(test_data[,2])
+ cat ("\ncurr_median : ", curr_median)
+
+ if (abs(curr_median - past_median) > n * past_sd) {
+ anomaly <- c(test_start, test_end, curr_median, past_median, past_sd)
+ anomalies <- rbind(anomalies, anomaly)
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", " Past SD")
+ }
+
+ return (anomalies)
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R
new file mode 100644
index 0000000..1e0c534
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R
@@ -0,0 +1,35 @@
+ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) {
+
+ res <- get_data(url)
+ num_metrics <- length(res$metrics)
+ anomalies <- data.frame()
+
+ metricname <- res$metrics[[1]]$metricname
+ data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+ names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+ for (i in 2:num_metrics) {
+ metricname <- res$metrics[[i]]$metricname
+ df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics))
+ names(df) <- c("TS", res$metrics[[i]]$metricname)
+ data <- merge(data, df)
+ }
+
+ algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)]
+ iForest <- IsolationTrees(algo_data)
+ test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ]
+
+ if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest)
+ for (i in 1:length(if_res$outF)) {
+ index <- test_start+i-1
+ if (if_res$outF[i] > threshold_score) {
+ anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i])
+ anomalies <- rbind(anomalies, anomaly)
+ }
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS", "Anomaly Score", "Path length")
+ }
+ return (anomalies)
+}
\ No newline at end of file
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
new file mode 100644
index 0000000..af21038
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
@@ -0,0 +1,21 @@
+ams_ks <- function(train_data, test_data, p_value) {
+
+# res <- get_data(url)
+# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+# names(data) <- c("TS", res$metrics[[1]]$metricname)
+# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2]
+
+ anomalies <- data.frame()
+ res <- ks.test(train_data, test_data[,2])
+
+ if (res[2] < p_value) {
+ anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2])
+ anomalies <- rbind(anomalies, anomaly)
+ }
+
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS Start", "TS End", "D", "p-value")
+ }
+ return (anomalies)
+}
\ No newline at end of file
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R
new file mode 100644
index 0000000..e66049f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R
@@ -0,0 +1,67 @@
+tukeys_anomalies <- data.frame()
+ema_global_anomalies <- data.frame()
+ema_daily_anomalies <- data.frame()
+ks_anomalies <- data.frame()
+hsdev_anomalies <- data.frame()
+
+init <- function() {
+ tukeys_anomalies <- data.frame()
+ ema_global_anomalies <- data.frame()
+ ema_daily_anomalies <- data.frame()
+ ks_anomalies <- data.frame()
+ hsdev_anomalies <- data.frame()
+}
+
+test_methods <- function(data) {
+
+ init()
+ #res <- get_data(url)
+ #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+ #names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+ limit <- data[length(data[,1]),1]
+ step <- data[2,1] - data[1,1]
+
+ train_start <- data[1,1]
+ train_end <- get_next_day_boundary(train_start, step, limit)
+ test_start <- train_end + step
+ test_end <- get_next_day_boundary(test_start, step, limit)
+ i <- 1
+ day <- 24*60*60*1000
+
+ while (test_start < limit) {
+
+ print (i)
+ i <- i + 1
+ train_data <- data[which(data$TS >= train_start & data$TS <= train_end),]
+ test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+ #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3))
+ #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3))
+ #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3))
+ #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05))
+ hsdev_train_data <- data[which(data$TS < test_start),]
+ hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day))
+
+ train_start <- test_start
+ train_end <- get_next_day_boundary(train_start, step, limit)
+ test_start <- train_end + step
+ test_end <- get_next_day_boundary(test_start, step, limit)
+ }
+ return (hsdev_anomalies)
+}
+
+get_next_day_boundary <- function(start, step, limit) {
+
+ if (start > limit) {
+ return (-1)
+ }
+
+ while (start <= limit) {
+ if (((start %% (24*60*60*1000)) - 28800000) == 0) {
+ return (start)
+ }
+ start <- start + step
+ }
+ return (start)
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
new file mode 100644
index 0000000..38f71f2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
@@ -0,0 +1,26 @@
+ams_tukeys <- function(train_data, test_data, n) {
+
+# res <- get_data(url)
+# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+# names(data) <- c("TS", res$metrics[[1]]$metricname)
+# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+ anomalies <- data.frame()
+ quantiles <- quantile(train_data[,2])
+ iqr <- quantiles[4] - quantiles[2]
+
+ for ( i in 1:length(test_data[,1])) {
+ x <- test_data[i,2]
+ lb <- quantiles[2] - n*iqr
+ ub <- quantiles[4] + n*iqr
+ if ( (x < lb) || (x > ub) ) {
+ anomaly <- c(test_data[i,1], x)
+ anomalies <- rbind(anomalies, anomaly)
+ }
+ }
+ if(length(anomalies) > 0) {
+ names(anomalies) <- c("TS", "Value")
+ }
+ return (anomalies)
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
new file mode 100644
index 0000000..eb19d37
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
@@ -0,0 +1,19 @@
+#url_prefix = 'http://104.196.95.78:3000/api/datasources/proxy/1/ws/v1/timeline/metrics?'
+#url_suffix = '&startTime=1459972944&endTime=1491508944&precision=MINUTES'
+#data_url <- paste(url_prefix, query, sep ="")
+#data_url <- paste(data_url, url_suffix, sep="")
+
+get_data <- function(url) {
+ library(rjson)
+ res <- fromJSON(readLines(url)[1])
+ return (res)
+}
+
+find_index <- function(data, ts) {
+ for (i in 1:length(data)) {
+ if (as.numeric(ts) == as.numeric(data[i])) {
+ return (i)
+ }
+ }
+ return (-1)
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
avijayan@apache.org.