You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2018/04/01 19:14:00 UTC

[ambari] 04/39: AMBARI-21106 : Ambari Metrics Anomaly detection prototype (Commit 3). (avijayan)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit b1ec22e38d60ffdda5cb7a65f9779da9de8b8a3c
Author: Aravindan Vijayan <av...@hortonworks.com>
AuthorDate: Tue May 30 13:35:54 2017 -0700

    AMBARI-21106 : Ambari Metrics Anomaly detection prototype (Commit 3). (avijayan)
---
 .../metrics/alertservice/R/RFunctionInvoker.java   | 15 ++--
 .../src/main/resources/R-scripts/ema.R             | 79 ++++++++++++++++++++++
 .../src/main/resources/R-scripts/hsdev.r           | 60 ++++++++++++++++
 .../src/main/resources/R-scripts/iforest.R         | 35 ++++++++++
 .../src/main/resources/R-scripts/kstest.r          | 21 ++++++
 .../src/main/resources/R-scripts/test.R            | 67 ++++++++++++++++++
 .../src/main/resources/R-scripts/tukeys.r          | 26 +++++++
 .../src/main/resources/R-scripts/util.R            | 19 ++++++
 8 files changed, 312 insertions(+), 10 deletions(-)

diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
index 8d1e520..71ad66d 100644
--- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java
@@ -31,8 +31,7 @@ public class RFunctionInvoker {
 
     public static ResultSet tukeys(DataSet trainData, DataSet testData, Map<String, String> configs) {
         try {
-            r.eval("library(ambarimetricsAD)");
-            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+            r.eval("source('tukeys.r', echo=TRUE)");
 
             int n = Integer.parseInt(configs.get("tukeys.n"));
             r.eval("n <- " + n);
@@ -57,8 +56,7 @@ public class RFunctionInvoker {
 
     public static ResultSet ema_global(DataSet trainData, DataSet testData, Map<String, String> configs) {
         try {
-            r.eval("library(ambarimetricsAD)");
-            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+            r.eval("source('ema.R', echo=TRUE)");
 
             int n = Integer.parseInt(configs.get("ema.n"));
             r.eval("n <- " + n);
@@ -87,8 +85,7 @@ public class RFunctionInvoker {
 
     public static ResultSet ema_daily(DataSet trainData, DataSet testData, Map<String, String> configs) {
         try {
-            r.eval("library(ambarimetricsAD)");
-            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+            r.eval("source('ema.R', echo=TRUE)");
 
             int n = Integer.parseInt(configs.get("ema.n"));
             r.eval("n <- " + n);
@@ -117,8 +114,7 @@ public class RFunctionInvoker {
 
     public static ResultSet ksTest(DataSet trainData, DataSet testData, Map<String, String> configs) {
         try {
-            r.eval("library(ambarimetricsAD)");
-            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/kstest.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+            r.eval("source('kstest.r', echo=TRUE)");
 
             double p_value = Double.parseDouble(configs.get("ks.p_value"));
             r.eval("p_value <- " + p_value);
@@ -144,8 +140,7 @@ public class RFunctionInvoker {
 
     public static ResultSet hsdev(DataSet trainData, DataSet testData, Map<String, String> configs) {
         try {
-            r.eval("library(ambarimetricsAD)");
-            r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/hsdev.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)");
+            r.eval("source('hsdev.r', echo=TRUE)");
 
             int n = Integer.parseInt(configs.get("hsdev.n"));
             r.eval("n <- " + n);
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R
new file mode 100644
index 0000000..d3188f0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R
@@ -0,0 +1,79 @@
+# EMA <- w * EMA + (1 - w) * x
+# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 )
+# Alarm = abs(x - EMA) > n * EMS
+
+ema_global <- function(train_data, test_data, w, n) {
+  
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+  
+  anomalies <- data.frame()
+  ema <- 0
+  ems <- 0
+
+  #Train Step
+  for (x in train_data) {
+    ema <- w*ema + (1-w)*x
+    ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+  }
+  
+  for ( i in 1:length(test_data[,1])) {
+    x <- test_data[i,2]
+    if (abs(x - ema) > n*ems) {
+      anomaly <- c(as.numeric(test_data[i,1]), x)
+      # print (anomaly)
+      anomalies <- rbind(anomalies, anomaly)
+    }
+    ema <- w*ema + (1-w)*x
+    ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
+  }
+  
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Value")
+  }
+  return (anomalies)
+}
+
+ema_daily <- function(train_data, test_data, w, n) {
+  
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+  
+  anomalies <- data.frame()
+  ema <- vector("numeric", 7)
+  ems <- vector("numeric", 7)
+  
+  #Train Step
+  for ( i in 1:length(train_data[,1])) {
+    x <- train_data[i,2]
+    time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+    index <- time$wday
+    ema[index] <- w*ema[index] + (1-w)*x
+    ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2)
+  }
+  
+  for ( i in 1:length(test_data[,1])) {
+    x <- test_data[i,2]
+    time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+    index <- time$wday
+    
+    if (abs(x - ema[index+1]) > n*ems[index+1]) {
+      anomaly <- c(as.numeric(test_data[i,1]), x)
+      # print (anomaly)
+      anomalies <- rbind(anomalies, anomaly)
+    }
+    ema[index+1] <- w*ema[index+1] + (1-w)*x
+    ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2)
+  }
+  
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Value")
+  }
+  return(anomalies)
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
new file mode 100644
index 0000000..ff8a8f7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
@@ -0,0 +1,60 @@
+hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) {
+
+  #res <- get_data(url)
+  #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+  #names(data) <- c("TS", res$metrics[[1]]$metricname)
+  anomalies <- data.frame()
+
+  granularity <- train_data[2,1] - train_data[1,1]
+  test_start <- test_data[1,1]
+  test_end <- test_data[length(test_data[1,]),1]
+  cat ("\n test_start : ", as.numeric(test_start))
+  train_start <- test_start - num_historic_periods*period
+  cat ("\n train_start : ", as.numeric(train_start))
+  # round to start of day
+  train_start <- train_start - (train_start %% interval)
+  cat ("\n train_start after rounding: ", as.numeric(train_start))
+
+  time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT")
+  test_data_day <- time$wday
+
+  h_data <- c()
+  for ( i in length(train_data[,1]):1) {
+    ts <- train_data[i,1]
+    if ( ts < train_start) {
+      cat ("\n Breaking out of loop : ", ts)
+      break
+    }
+    time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT")
+    if (time$wday == test_data_day) {
+      x <- train_data[i,2]
+      h_data <- c(h_data, x)
+    }
+  }
+
+  cat ("\n Train data length : ", length(train_data[,1]))
+  cat ("\n Test data length : ", length(test_data[,1]))
+  cat ("\n Historic data length : ", length(h_data))
+  if (length(h_data) < 2*length(test_data[,1])) {
+    cat ("\nNot enough training data")
+    return (anomalies)
+  }
+
+  past_median <- median(h_data)
+  cat ("\npast_median : ", past_median)
+  past_sd <- sd(h_data)
+  cat ("\npast_sd : ", past_sd)
+  curr_median <- median(test_data[,2])
+  cat ("\ncurr_median : ", curr_median)
+
+  if (abs(curr_median - past_median) > n * past_sd) {
+    anomaly <- c(test_start, test_end, curr_median, past_median, past_sd)
+    anomalies <- rbind(anomalies, anomaly)
+  }
+
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", " Past SD")
+  }
+
+  return (anomalies)
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R
new file mode 100644
index 0000000..1e0c534
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R
@@ -0,0 +1,35 @@
+ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) {
+  
+  res <- get_data(url)
+  num_metrics <- length(res$metrics)
+  anomalies <- data.frame()
+  
+  metricname <- res$metrics[[1]]$metricname
+  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+  names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+  for (i in 2:num_metrics) {
+    metricname <- res$metrics[[i]]$metricname
+    df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics))
+    names(df) <- c("TS", res$metrics[[i]]$metricname)
+    data <- merge(data, df)
+  }
+  
+  algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)]
+  iForest <- IsolationTrees(algo_data)
+  test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ]
+  
+  if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest)
+  for (i in 1:length(if_res$outF)) {
+    index <- test_start+i-1
+    if (if_res$outF[i] > threshold_score) {
+      anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i])
+      anomalies <- rbind(anomalies, anomaly)
+    } 
+  }
+  
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Anomaly Score", "Path length")
+  }
+  return (anomalies)
+}
\ No newline at end of file
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
new file mode 100644
index 0000000..af21038
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
@@ -0,0 +1,21 @@
+ams_ks <- function(train_data, test_data, p_value) {
+  
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2]
+  
+  anomalies <- data.frame()
+  res <- ks.test(train_data, test_data[,2])
+  
+  if (res[2] < p_value) {
+    anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2])
+    anomalies <- rbind(anomalies, anomaly)
+  }
+ 
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS Start", "TS End", "D", "p-value")
+  }
+  return (anomalies)
+}
\ No newline at end of file
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R
new file mode 100644
index 0000000..e66049f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R
@@ -0,0 +1,67 @@
+tukeys_anomalies <- data.frame()
+ema_global_anomalies <- data.frame()
+ema_daily_anomalies <- data.frame()
+ks_anomalies <- data.frame()
+hsdev_anomalies <- data.frame()
+
+init <- function() {
+  tukeys_anomalies <- data.frame()
+  ema_global_anomalies <- data.frame()
+  ema_daily_anomalies <- data.frame()
+  ks_anomalies <- data.frame()
+  hsdev_anomalies <- data.frame()
+}
+
+test_methods <- function(data) {
+
+  init()
+  #res <- get_data(url)
+  #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+  #names(data) <- c("TS", res$metrics[[1]]$metricname)
+
+  limit <- data[length(data[,1]),1]
+  step <- data[2,1] - data[1,1]
+
+  train_start <- data[1,1]
+  train_end <- get_next_day_boundary(train_start, step, limit)
+  test_start <- train_end + step
+  test_end <- get_next_day_boundary(test_start, step, limit)
+  i <- 1
+  day <- 24*60*60*1000
+
+  while (test_start < limit) {
+
+    print (i)
+    i <- i + 1
+    train_data <- data[which(data$TS >= train_start & data$TS <= train_end),]
+    test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+    #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3))
+    #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3))
+    #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3))
+    #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05))
+    hsdev_train_data <- data[which(data$TS < test_start),]
+    hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day))
+
+    train_start <- test_start
+    train_end <- get_next_day_boundary(train_start, step, limit)
+    test_start <- train_end + step
+    test_end <- get_next_day_boundary(test_start, step, limit)
+  }
+  return (hsdev_anomalies)
+}
+
+get_next_day_boundary <- function(start, step, limit) {
+
+  if (start > limit) {
+    return (-1)
+  }
+
+  while (start <= limit) {
+    if (((start %% (24*60*60*1000)) - 28800000) == 0) {
+      return (start)
+    }
+    start <- start + step
+  }
+  return (start)
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
new file mode 100644
index 0000000..38f71f2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
@@ -0,0 +1,26 @@
+ams_tukeys <- function(train_data, test_data, n) {
+
+#  res <- get_data(url)
+#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics))
+#  names(data) <- c("TS", res$metrics[[1]]$metricname)
+#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
+#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
+
+  anomalies <- data.frame()
+  quantiles <- quantile(train_data[,2])
+  iqr <- quantiles[4] - quantiles[2]
+
+  for ( i in 1:length(test_data[,1])) {
+    x <- test_data[i,2]
+    lb <- quantiles[2] - n*iqr
+    ub <- quantiles[4] + n*iqr
+    if ( (x < lb)  || (x > ub) ) {
+      anomaly <- c(test_data[i,1], x)
+      anomalies <- rbind(anomalies, anomaly)
+    }
+  }
+  if(length(anomalies) > 0) {
+    names(anomalies) <- c("TS", "Value")
+  }
+  return (anomalies)
+}
diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
new file mode 100644
index 0000000..eb19d37
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
@@ -0,0 +1,19 @@
+#url_prefix = 'http://104.196.95.78:3000/api/datasources/proxy/1/ws/v1/timeline/metrics?'
+#url_suffix = '&startTime=1459972944&endTime=1491508944&precision=MINUTES'
+#data_url <- paste(url_prefix, query, sep ="")
+#data_url <- paste(data_url, url_suffix, sep="")
+
+get_data <- function(url) {
+  library(rjson)
+  res <- fromJSON(readLines(url)[1])
+  return (res)
+}
+
+find_index <- function(data, ts) {
+  for (i in 1:length(data)) {
+    if (as.numeric(ts) == as.numeric(data[i])) {
+      return (i)
+    }
+  }
+  return (-1)
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
avijayan@apache.org.