You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/04/01 17:21:16 UTC
[incubator-pinot] 01/01: [TE] Add early terminate in detection loop
This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch add_early_terminate_detection
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 28b63210581908cb734b76112dea72fb219c4c11
Author: Xiaohui Sun <xh...@xhsun-mn3.linkedin.biz>
AuthorDate: Mon Apr 1 10:20:56 2019 -0700
[TE] Add early terminate in detection loop
---
.../detection/wrapper/AnomalyDetectorWrapper.java | 32 ++++++++++++++++++----
1 file changed, 27 insertions(+), 5 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index f8ebe7c..b888379 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -76,11 +76,14 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
private static final String PROP_TIMEZONE = "timezone";
private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
private static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
- private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(0);
+ private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(-1);
private static final long CACHING_PERIOD_LOOKBACK_DAILY = TimeUnit.DAYS.toMillis(90);
private static final long CACHING_PERIOD_LOOKBACK_HOURLY = TimeUnit.DAYS.toMillis(60);
// disable minute level cache warm up
private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1;
+ // fail detection job if it failed successively for the first 10 windows
+ private static final long EARLY_TERMINATE_WINDOW = 10;
+
private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorWrapper.class);
@@ -155,8 +158,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
List<Interval> monitoringWindows = this.getMonitoringWindows();
List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+ int totalWindows = monitoringWindows.size();
int successWindows = 0;
- for (int i = 0; i < monitoringWindows.size(); i++) {
+ Exception lastException = null;
+ for (int i = 0; i < totalWindows; i++) {
+ earlyTerminate(i, successWindows, totalWindows, lastException);
+
+ // run detection
Interval window = monitoringWindows.get(i);
List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
try {
@@ -169,16 +177,20 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
successWindows++;
}
catch (DetectorDataInsufficientException e) {
+ lastException = e;
LOG.warn("[DetectionConfigID{}] Insufficient data ro run detection for window {} to {}.", this.config.getId(), window.getStart(), window.getEnd());
}
catch (Exception e) {
+ lastException = e;
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
}
anomalies.addAll(anomaliesForOneWindow);
}
- if (successWindows == 0 && monitoringWindows.size() > 0) {
- LOG.error("All {} detection windows failed for config {} metricUrn {}.", monitoringWindows.size(), config.getId(), metricUrn);
- throw new DetectorException("All detection windows failed.");
+
+ // throw exception if all windows failed
+ if (successWindows == 0 && totalWindows > 0) {
+ LOG.error("All {} detection windows failed for config {} metricUrn {}.", totalWindows, config.getId(), metricUrn);
+ throw new DetectorException("All " + totalWindows + " detection windows failed.", lastException);
}
for (MergedAnomalyResultDTO anomaly : anomalies) {
@@ -194,6 +206,16 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
Collectors.toList()), lastTimeStamp);
}
+ private void earlyTerminate(int currentWindows, int successWindows, int totalWindows, Exception lastException)
+ throws DetectorException {
+ // early termination if first of the EARLY_TERMINATE_WINDOW all failed
+ if (currentWindows == EARLY_TERMINATE_WINDOW && successWindows == 0) {
+ LOG.error("Successive first {} detection windows failed for config {} metricUrn {}.", EARLY_TERMINATE_WINDOW, config.getId(), metricUrn);
+ throw new DetectorException(String.format("Successive first %d/%d detection windows failed.", EARLY_TERMINATE_WINDOW, totalWindows),
+ lastException);
+ }
+ }
+
// guess-timate next time stamp
// there are two cases. If the data is complete, next detection starts from the end time of this detection
// If data is incomplete, next detection starts from the latest available data's time stamp plus the one time granularity.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org