You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/04/01 17:45:13 UTC

[incubator-pinot] branch master updated: [TE] Add early terminate in detection loop (#4042)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 046578f  [TE] Add early terminate in detection loop (#4042)
046578f is described below

commit 046578f38dcf28a7e9ddb5bdd7b4fbb9b620deb0
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Mon Apr 1 10:45:08 2019 -0700

    [TE] Add early terminate in detection loop (#4042)
---
 .../detection/wrapper/AnomalyDetectorWrapper.java  | 32 ++++++++++++++++++----
 1 file changed, 27 insertions(+), 5 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index f8ebe7c..b888379 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -76,11 +76,14 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private static final String PROP_TIMEZONE = "timezone";
   private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
   private static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
-  private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(0);
+  private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(-1);
   private static final long CACHING_PERIOD_LOOKBACK_DAILY = TimeUnit.DAYS.toMillis(90);
   private static final long CACHING_PERIOD_LOOKBACK_HOURLY = TimeUnit.DAYS.toMillis(60);
   // disable minute level cache warm up
   private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1;
+  // fail detection job if it failed successively for the first 10 windows
+  private static final long EARLY_TERMINATE_WINDOW = 10;
+
 
   private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorWrapper.class);
 
@@ -155,8 +158,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
 
     List<Interval> monitoringWindows = this.getMonitoringWindows();
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+    int totalWindows = monitoringWindows.size();
     int successWindows = 0;
-    for (int i = 0; i < monitoringWindows.size(); i++) {
+    Exception lastException = null;
+    for (int i = 0; i < totalWindows; i++) {
+      earlyTerminate(i, successWindows, totalWindows, lastException);
+
+      // run detection
       Interval window = monitoringWindows.get(i);
       List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
       try {
@@ -169,16 +177,20 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
         successWindows++;
       }
       catch (DetectorDataInsufficientException e) {
+        lastException = e;
         LOG.warn("[DetectionConfigID{}] Insufficient data ro run detection for window {} to {}.", this.config.getId(), window.getStart(), window.getEnd());
       }
       catch (Exception e) {
+        lastException = e;
         LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
       }
       anomalies.addAll(anomaliesForOneWindow);
     }
-    if (successWindows == 0 && monitoringWindows.size() > 0) {
-      LOG.error("All {} detection windows failed for config {} metricUrn {}.", monitoringWindows.size(), config.getId(), metricUrn);
-      throw new DetectorException("All detection windows failed.");
+
+    // throw exception if all windows failed
+    if (successWindows == 0 && totalWindows > 0) {
+      LOG.error("All {} detection windows failed for config {} metricUrn {}.", totalWindows, config.getId(), metricUrn);
+      throw new DetectorException("All " + totalWindows + " detection windows failed.", lastException);
     }
 
     for (MergedAnomalyResultDTO anomaly : anomalies) {
@@ -194,6 +206,16 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
         Collectors.toList()), lastTimeStamp);
   }
 
+  private void earlyTerminate(int currentWindows, int successWindows, int totalWindows, Exception lastException)
+      throws DetectorException {
+    // early termination if first of the EARLY_TERMINATE_WINDOW all failed
+    if (currentWindows == EARLY_TERMINATE_WINDOW && successWindows == 0) {
+      LOG.error("Successive first {} detection windows failed for config {} metricUrn {}.", EARLY_TERMINATE_WINDOW, config.getId(), metricUrn);
+      throw new DetectorException(String.format("Successive first %d/%d detection windows failed.", EARLY_TERMINATE_WINDOW, totalWindows),
+          lastException);
+    }
+  }
+
   // guess-timate next time stamp
   // there are two cases. If the data is complete, next detection starts from the end time of this detection
   // If data is incomplete, next detection starts from the latest available data's time stamp plus the one time granularity.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org