You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/03/29 20:12:43 UTC

[incubator-pinot] branch master updated: Fail pipeline all windows failed (#4032)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 334b918  Fail pipeline all windows failed (#4032)
334b918 is described below

commit 334b918c2e7832776baf52e62bb48d4c806e6fe0
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Fri Mar 29 13:12:38 2019 -0700

    Fail pipeline all windows failed (#4032)
    
    * [TE] Reduce log for data insufficient exception.
    
    * [TE] fail the pipeline if all detection windows failed
---
 .../thirdeye/detection/algorithm/MergeWrapper.java |  5 ++++
 .../detection/wrapper/AnomalyDetectorWrapper.java  | 27 +++++++++++++++++-----
 2 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index d4dabf1..83be36c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -38,9 +38,12 @@ import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.detection.ConfigUtils;
 import org.apache.pinot.thirdeye.detection.DataProvider;
 import org.apache.pinot.thirdeye.detection.DetectionPipeline;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineJob;
 import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
 import org.apache.pinot.thirdeye.detection.DetectionUtils;
 import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -53,6 +56,7 @@ public class MergeWrapper extends DetectionPipeline {
   private static final String PROP_MERGE_KEY = "mergeKey";
   private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
   private static final int NUMBER_OF_SPLITED_ANOMALIES_LIMIT = 1000;
+  private static final Logger LOG = LoggerFactory.getLogger(MergeWrapper.class);
 
   protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new Comparator<MergedAnomalyResultDTO>() {
     @Override
@@ -212,6 +216,7 @@ public class MergeWrapper extends DetectionPipeline {
     int anomalyCountAfterSplit = (int) Math.ceil((anomaly.getEndTime() - anomaly.getStartTime()) / (double) maxDuration);
     if (anomalyCountAfterSplit > NUMBER_OF_SPLITED_ANOMALIES_LIMIT) {
       // if the number of anomalies after split is more than the limit, don't split
+      LOG.warn("Exceeded max number of split count. maxDuration = {}, anomaly split count = {}", maxDuration, anomalyCountAfterSplit);
       return Collections.singleton(anomaly);
     }
     Set<MergedAnomalyResultDTO> result = new HashSet<>();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 09a64bb..e8b12db 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -41,6 +41,8 @@ import org.apache.pinot.thirdeye.detection.DetectionPipeline;
 import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
 import org.apache.pinot.thirdeye.detection.DetectionUtils;
 import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.exception.DetectorDataInsufficientException;
+import org.apache.pinot.thirdeye.detection.spi.exception.DetectorException;
 import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
 import org.joda.time.DateTime;
@@ -73,7 +75,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private static final String PROP_TIMEZONE = "timezone";
   private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
   private static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
-  private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(30);
+  private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(0);
   private static final long CACHING_PERIOD_LOOKBACK_DAILY = TimeUnit.DAYS.toMillis(90);
   private static final long CACHING_PERIOD_LOOKBACK_HOURLY = TimeUnit.DAYS.toMillis(60);
   // disable minute level cache warm up
@@ -144,7 +146,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     // 1. get the last time stamp for the time series.
     // 2. to calculate current values and  baseline values for the anomalies detected
     // 3. anomaly detection current and baseline time series value
-    if( this.cachingPeriodLookback >= 0) {
+    if( this.cachingPeriodLookback > 0) {
       MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), startTime - cachingPeriodLookback, endTime,
           this.metricEntity.getFilters());
       this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
@@ -152,18 +154,31 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
 
     List<Interval> monitoringWindows = this.getMonitoringWindows();
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
-    for (Interval window : monitoringWindows) {
+    int successWindows = 0;
+    for (int i = 0; i < monitoringWindows.size(); i++) {
+      Interval window = monitoringWindows.get(i);
       List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
       try {
-        LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time {}", config.getId(), metricUrn, window.getStart(), window.getEnd());
+        LOG.info("[Pipeline] running detection for config {} metricUrn {} window ({}/{}) - start {} end {}",
+            config.getId(), metricUrn, i, monitoringWindows.size(), window.getStart(), window.getEnd());
         long ts = System.currentTimeMillis();
         anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
-        LOG.info("[New Pipeline] run anomaly detection for window {} - {} used {} milliseconds, detected {} anomalies", window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
-      } catch (Exception e) {
+        LOG.info("[Pipeline] run anomaly detection for window ({}/{}) - start {} end {}  used {} milliseconds, detected {} anomalies",
+            i, monitoringWindows.size(), window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
+        successWindows++;
+      }
+      catch (DetectorDataInsufficientException e) {
+        LOG.warn("[DetectionConfigID{}] Insufficient data ro run detection for window {} to {}.", this.config.getId(), window.getStart(), window.getEnd());
+      }
+      catch (Exception e) {
         LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
       }
       anomalies.addAll(anomaliesForOneWindow);
     }
+    if (successWindows == 0 && monitoringWindows.size() > 0) {
+      LOG.error("All {} detection windows failed for config {} metricUrn {}.", monitoringWindows.size(), config.getId(), metricUrn);
+      throw new DetectorException("All detection windows failed.");
+    }
 
     for (MergedAnomalyResultDTO anomaly : anomalies) {
       anomaly.setDetectionConfigId(this.config.getId());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org