You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/03/29 20:12:43 UTC
[incubator-pinot] branch master updated: Fail pipeline all windows
failed (#4032)
This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 334b918 Fail pipeline all windows failed (#4032)
334b918 is described below
commit 334b918c2e7832776baf52e62bb48d4c806e6fe0
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Fri Mar 29 13:12:38 2019 -0700
Fail pipeline all windows failed (#4032)
* [TE] Reduce log for data insufficient exception.
* [TE] fail the pipeline if all detection windows failed
---
.../thirdeye/detection/algorithm/MergeWrapper.java | 5 ++++
.../detection/wrapper/AnomalyDetectorWrapper.java | 27 +++++++++++++++++-----
2 files changed, 26 insertions(+), 6 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index d4dabf1..83be36c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -38,9 +38,12 @@ import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.detection.ConfigUtils;
import org.apache.pinot.thirdeye.detection.DataProvider;
import org.apache.pinot.thirdeye.detection.DetectionPipeline;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineJob;
import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
import org.apache.pinot.thirdeye.detection.DetectionUtils;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -53,6 +56,7 @@ public class MergeWrapper extends DetectionPipeline {
private static final String PROP_MERGE_KEY = "mergeKey";
private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
private static final int NUMBER_OF_SPLITED_ANOMALIES_LIMIT = 1000;
+ private static final Logger LOG = LoggerFactory.getLogger(MergeWrapper.class);
protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new Comparator<MergedAnomalyResultDTO>() {
@Override
@@ -212,6 +216,7 @@ public class MergeWrapper extends DetectionPipeline {
int anomalyCountAfterSplit = (int) Math.ceil((anomaly.getEndTime() - anomaly.getStartTime()) / (double) maxDuration);
if (anomalyCountAfterSplit > NUMBER_OF_SPLITED_ANOMALIES_LIMIT) {
// if the number of anomalies after split is more than the limit, don't split
+ LOG.warn("Exceeded max number of split count. maxDuration = {}, anomaly split count = {}", maxDuration, anomalyCountAfterSplit);
return Collections.singleton(anomaly);
}
Set<MergedAnomalyResultDTO> result = new HashSet<>();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 09a64bb..e8b12db 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -41,6 +41,8 @@ import org.apache.pinot.thirdeye.detection.DetectionPipeline;
import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
import org.apache.pinot.thirdeye.detection.DetectionUtils;
import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.exception.DetectorDataInsufficientException;
+import org.apache.pinot.thirdeye.detection.spi.exception.DetectorException;
import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
import org.joda.time.DateTime;
@@ -73,7 +75,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
private static final String PROP_TIMEZONE = "timezone";
private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
private static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback";
- private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(30);
+ private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(0);
private static final long CACHING_PERIOD_LOOKBACK_DAILY = TimeUnit.DAYS.toMillis(90);
private static final long CACHING_PERIOD_LOOKBACK_HOURLY = TimeUnit.DAYS.toMillis(60);
// disable minute level cache warm up
@@ -144,7 +146,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
// 1. get the last time stamp for the time series.
// 2. to calculate current values and baseline values for the anomalies detected
// 3. anomaly detection current and baseline time series value
- if( this.cachingPeriodLookback >= 0) {
+ if( this.cachingPeriodLookback > 0) {
MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), startTime - cachingPeriodLookback, endTime,
this.metricEntity.getFilters());
this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
@@ -152,18 +154,31 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
List<Interval> monitoringWindows = this.getMonitoringWindows();
List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
- for (Interval window : monitoringWindows) {
+ int successWindows = 0;
+ for (int i = 0; i < monitoringWindows.size(); i++) {
+ Interval window = monitoringWindows.get(i);
List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
try {
- LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time {}", config.getId(), metricUrn, window.getStart(), window.getEnd());
+ LOG.info("[Pipeline] running detection for config {} metricUrn {} window ({}/{}) - start {} end {}",
+ config.getId(), metricUrn, i, monitoringWindows.size(), window.getStart(), window.getEnd());
long ts = System.currentTimeMillis();
anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
- LOG.info("[New Pipeline] run anomaly detection for window {} - {} used {} milliseconds, detected {} anomalies", window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
- } catch (Exception e) {
+ LOG.info("[Pipeline] run anomaly detection for window ({}/{}) - start {} end {} used {} milliseconds, detected {} anomalies",
+ i, monitoringWindows.size(), window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
+ successWindows++;
+ }
+ catch (DetectorDataInsufficientException e) {
+ LOG.warn("[DetectionConfigID{}] Insufficient data ro run detection for window {} to {}.", this.config.getId(), window.getStart(), window.getEnd());
+ }
+ catch (Exception e) {
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
}
anomalies.addAll(anomaliesForOneWindow);
}
+ if (successWindows == 0 && monitoringWindows.size() > 0) {
+ LOG.error("All {} detection windows failed for config {} metricUrn {}.", monitoringWindows.size(), config.getId(), metricUrn);
+ throw new DetectorException("All detection windows failed.");
+ }
for (MergedAnomalyResultDTO anomaly : anomalies) {
anomaly.setDetectionConfigId(this.config.getId());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org