You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/04/02 16:50:46 UTC

[incubator-pinot] branch master updated: [TE] Speed up minute level detection (#4053)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 153c52d  [TE] Speed up minute level detection (#4053)
153c52d is described below

commit 153c52de62e282f5778e0534d626922701b96355
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Tue Apr 2 09:50:42 2019 -0700

    [TE] Speed up minute level detection (#4053)
    
    * [TE] Speed up minute level detection
    
    * [TE] Speed up minute level detection
---
 .../pinot/thirdeye/detection/alert/AlertUtils.java |  3 --
 .../detection/wrapper/AnomalyDetectorWrapper.java  | 52 ++++++++++++++--------
 2 files changed, 33 insertions(+), 22 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
index 8e6adf3..a298267 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
@@ -24,9 +24,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
-import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.mysql.jdbc.StringUtils;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,7 +36,6 @@ import javax.mail.internet.InternetAddress;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 
-
 public class AlertUtils {
   private AlertUtils() {
     //left blank
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index b888379..8b499ca 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -81,9 +81,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private static final long CACHING_PERIOD_LOOKBACK_HOURLY = TimeUnit.DAYS.toMillis(60);
   // disable minute level cache warm up
   private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1;
-  // fail detection job if it failed successively for the first 10 windows
-  private static final long EARLY_TERMINATE_WINDOW = 10;
-
+  // fail detection job if it failed successively for the first 5 windows
+  private static final long EARLY_TERMINATE_WINDOW = 5;
 
   private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorWrapper.class);
 
@@ -92,18 +91,18 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
 
   private final int windowDelay;
   private final TimeUnit windowDelayUnit;
-  private final int windowSize;
-  private final TimeUnit windowUnit;
+  private int windowSize;
+  private TimeUnit windowUnit;
   private final MetricConfigDTO metric;
   private final MetricEntity metricEntity;
   private final boolean isMovingWindowDetection;
   // need to specify run frequency for minute level detection. Used for moving monitoring window alignment, default to be 15 minutes.
   private final TimeGranularity functionFrequency;
   private final String detectorName;
-  private final long windowSizeMillis;
+  private long windowSizeMillis;
   private final DatasetConfigDTO dataset;
   private final DateTimeZone dateTimeZone;
-  private final Period bucketPeriod;
+  private Period bucketPeriod;
   private final long cachingPeriodLookback;
 
   public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
@@ -142,6 +141,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     this.bucketPeriod = bucketStr == null ? this.getBucketSizePeriodForDataset() : Period.parse(bucketStr);
     this.cachingPeriodLookback = config.getProperties().containsKey(PROP_CACHE_PERIOD_LOOKBACK) ?
         MapUtils.getLong(config.getProperties(), PROP_CACHE_PERIOD_LOOKBACK) : getCachingPeriodLookback(this.dataset.bucketTimeGranularity());
+
+    speedUpMinuteLevelDetection();
   }
 
   @Override
@@ -168,12 +169,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
       Interval window = monitoringWindows.get(i);
       List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
       try {
-        LOG.info("[Pipeline] running detection for config {} metricUrn {} window ({}/{}) - start {} end {}",
-            config.getId(), metricUrn, i, monitoringWindows.size(), window.getStart(), window.getEnd());
+        LOG.info("[Pipeline] start detection for config {} metricUrn {} window ({}/{}) - start {} end {}",
+            config.getId(), metricUrn, i + 1, monitoringWindows.size(), window.getStart(), window.getEnd());
         long ts = System.currentTimeMillis();
         anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
-        LOG.info("[Pipeline] run anomaly detection for window ({}/{}) - start {} end {}  used {} milliseconds, detected {} anomalies",
-            i, monitoringWindows.size(), window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
+        LOG.info("[Pipeline] end detection for config {} metricUrn {} window ({}/{}) - start {} end {} used {} milliseconds, detected {} anomalies",
+            config.getId(), metricUrn, i + 1, monitoringWindows.size(), window.getStart(), window.getEnd(),
+            System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
         successWindows++;
       }
       catch (DetectorDataInsufficientException e) {
@@ -256,13 +258,6 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
         for (Interval window : monitoringWindows){
           LOG.info("Will run detection in window {}", window);
         }
-        // pre cache the time series for the whole detection time period instead of fetching for each window
-        if (this.cachingPeriodLookback >= 0) {
-          MetricSlice cacheSlice =
-              MetricSlice.from(this.metricEntity.getId(), startTime - cachingPeriodLookback, endTime,
-                  this.metricEntity.getFilters(), toTimeGranularity(this.bucketPeriod));
-          this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
-        }
         return monitoringWindows;
       } catch (Exception e) {
         LOG.info("can't generate moving monitoring windows, calling with single detection window", e);
@@ -371,4 +366,23 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
       return new TimeGranularity(period.getMillis(), TimeUnit.MILLISECONDS);
     }
   }
-}
+
+  /**
+   * Speed up minute level detection.
+   *
+   * It will generate lots of small windows if the bucket size smaller than 15 minutes and detection window larger than 1 day.
+   * This optimization is to change the bucket period, window size and window unit to 1 day.
+   * Please note we need to change all the three parameters together since the detection window is:
+   * [bucketPeriod_end - windowSize * windowUnit, bucketPeriod_end]
+   *
+   * It is possible to have bucketPeriod as 5 minutes but windowSize is 6 hours.
+   */
+  private void speedUpMinuteLevelDetection() {
+    if (bucketPeriod.getMinutes() <= 15 && endTime - startTime >= Period.days(1).getMillis()) {
+      bucketPeriod = Period.days(1);
+      windowSize = 1;
+      windowUnit = TimeUnit.DAYS;
+      windowSizeMillis = TimeUnit.MILLISECONDS.convert(windowSize, windowUnit);
+    }
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org