You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/04/02 16:50:46 UTC
[incubator-pinot] branch master updated: [TE] Speed up minute level
detection (#4053)
This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 153c52d [TE] Speed up minute level detection (#4053)
153c52d is described below
commit 153c52de62e282f5778e0534d626922701b96355
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Tue Apr 2 09:50:42 2019 -0700
[TE] Speed up minute level detection (#4053)
* [TE] Speed up minute level detection
* [TE] Speed up minute level detection
---
.../pinot/thirdeye/detection/alert/AlertUtils.java | 3 --
.../detection/wrapper/AnomalyDetectorWrapper.java | 52 ++++++++++++++--------
2 files changed, 33 insertions(+), 22 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
index 8e6adf3..a298267 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
@@ -24,9 +24,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
-import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.mysql.jdbc.StringUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -38,7 +36,6 @@ import javax.mail.internet.InternetAddress;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
-
public class AlertUtils {
private AlertUtils() {
//left blank
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index b888379..8b499ca 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -81,9 +81,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
private static final long CACHING_PERIOD_LOOKBACK_HOURLY = TimeUnit.DAYS.toMillis(60);
// disable minute level cache warm up
private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1;
- // fail detection job if it failed successively for the first 10 windows
- private static final long EARLY_TERMINATE_WINDOW = 10;
-
+ // fail detection job if it failed successively for the first 5 windows
+ private static final long EARLY_TERMINATE_WINDOW = 5;
private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorWrapper.class);
@@ -92,18 +91,18 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
private final int windowDelay;
private final TimeUnit windowDelayUnit;
- private final int windowSize;
- private final TimeUnit windowUnit;
+ private int windowSize;
+ private TimeUnit windowUnit;
private final MetricConfigDTO metric;
private final MetricEntity metricEntity;
private final boolean isMovingWindowDetection;
// need to specify run frequency for minute level detection. Used for moving monitoring window alignment, default to be 15 minutes.
private final TimeGranularity functionFrequency;
private final String detectorName;
- private final long windowSizeMillis;
+ private long windowSizeMillis;
private final DatasetConfigDTO dataset;
private final DateTimeZone dateTimeZone;
- private final Period bucketPeriod;
+ private Period bucketPeriod;
private final long cachingPeriodLookback;
public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
@@ -142,6 +141,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
this.bucketPeriod = bucketStr == null ? this.getBucketSizePeriodForDataset() : Period.parse(bucketStr);
this.cachingPeriodLookback = config.getProperties().containsKey(PROP_CACHE_PERIOD_LOOKBACK) ?
MapUtils.getLong(config.getProperties(), PROP_CACHE_PERIOD_LOOKBACK) : getCachingPeriodLookback(this.dataset.bucketTimeGranularity());
+
+ speedUpMinuteLevelDetection();
}
@Override
@@ -168,12 +169,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
Interval window = monitoringWindows.get(i);
List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
try {
- LOG.info("[Pipeline] running detection for config {} metricUrn {} window ({}/{}) - start {} end {}",
- config.getId(), metricUrn, i, monitoringWindows.size(), window.getStart(), window.getEnd());
+ LOG.info("[Pipeline] start detection for config {} metricUrn {} window ({}/{}) - start {} end {}",
+ config.getId(), metricUrn, i + 1, monitoringWindows.size(), window.getStart(), window.getEnd());
long ts = System.currentTimeMillis();
anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
- LOG.info("[Pipeline] run anomaly detection for window ({}/{}) - start {} end {} used {} milliseconds, detected {} anomalies",
- i, monitoringWindows.size(), window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
+ LOG.info("[Pipeline] end detection for config {} metricUrn {} window ({}/{}) - start {} end {} used {} milliseconds, detected {} anomalies",
+ config.getId(), metricUrn, i + 1, monitoringWindows.size(), window.getStart(), window.getEnd(),
+ System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
successWindows++;
}
catch (DetectorDataInsufficientException e) {
@@ -256,13 +258,6 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
for (Interval window : monitoringWindows){
LOG.info("Will run detection in window {}", window);
}
- // pre cache the time series for the whole detection time period instead of fetching for each window
- if (this.cachingPeriodLookback >= 0) {
- MetricSlice cacheSlice =
- MetricSlice.from(this.metricEntity.getId(), startTime - cachingPeriodLookback, endTime,
- this.metricEntity.getFilters(), toTimeGranularity(this.bucketPeriod));
- this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
- }
return monitoringWindows;
} catch (Exception e) {
LOG.info("can't generate moving monitoring windows, calling with single detection window", e);
@@ -371,4 +366,23 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
return new TimeGranularity(period.getMillis(), TimeUnit.MILLISECONDS);
}
}
-}
+
+ /**
+ * Speed up minute level detection.
+ *
+ * It will generate lots of small windows if the bucket size smaller than 15 minutes and detection window larger than 1 day.
+ * This optimization is to change the bucket period, window size and window unit to 1 day.
+ * Please note we need to change all the three parameters together since the detection window is:
+ * [bucketPeriod_end - windowSize * windowUnit, bucketPeriod_end]
+ *
+ * It is possible to have bucketPeriod as 5 minutes but windowSize is 6 hours.
+ */
+ private void speedUpMinuteLevelDetection() {
+ if (bucketPeriod.getMinutes() <= 15 && endTime - startTime >= Period.days(1).getMillis()) {
+ bucketPeriod = Period.days(1);
+ windowSize = 1;
+ windowUnit = TimeUnit.DAYS;
+ windowSizeMillis = TimeUnit.MILLISECONDS.convert(windowSize, windowUnit);
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org