You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/04/25 22:52:42 UTC
[incubator-pinot] branch master updated: [TE] add threshold for
dimension explore (#4166)
This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d60f445 [TE] add threshold for dimension explore (#4166)
d60f445 is described below
commit d60f4454540ea71b42230c82829965c086433c44
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Thu Apr 25 15:52:37 2019 -0700
[TE] add threshold for dimension explore (#4166)
---
.../detection/algorithm/DimensionWrapper.java | 47 +++++++++++++++++-----
.../detection/wrapper/AnomalyDetectorWrapper.java | 11 +++--
2 files changed, 45 insertions(+), 13 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
index 8950d52..51bdaf5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -70,6 +70,11 @@ public class DimensionWrapper extends DetectionPipeline {
private static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
private static final String PROP_CLASS_NAME = "className";
+
+ // Max number of dimension combinations we can handle.
+ private static final int MAX_DIMENSION_COMBINATIONS = 20000;
+
+ // Stop running if the first several dimension combinations all failed.
private static final int EARLY_STOP_THRESHOLD = 10;
private final String metricUrn;
@@ -126,8 +131,12 @@ public class DimensionWrapper extends DetectionPipeline {
}
}
- @Override
- public DetectionPipelineResult run() throws Exception {
+ /**
+ * Run Dimension explore and return explored metrics.
+ *
+ * @return List of metrics to process.
+ */
+ private List<MetricEntity> dimensionExplore() {
List<MetricEntity> nestedMetrics = new ArrayList<>();
if (this.metricUrn != null) {
@@ -140,7 +149,7 @@ public class DimensionWrapper extends DetectionPipeline {
DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions).get(slice);
if (aggregates.isEmpty()) {
- return new DetectionPipelineResult(Collections.<MergedAnomalyResultDTO>emptyList(), -1);
+ return nestedMetrics;
}
final double total = aggregates.getDoubles(COL_VALUE).sum().fillNull().doubleValue();
@@ -197,12 +206,29 @@ public class DimensionWrapper extends DetectionPipeline {
nestedMetrics = nestedMetrics.stream().filter(metricEntity -> checkMinLiveZone(metricEntity)).collect(Collectors.toList());
}
+ return nestedMetrics;
+ }
+
+ @Override
+ public DetectionPipelineResult run() throws Exception {
+ List<MetricEntity> nestedMetrics = dimensionExplore();
+ if (nestedMetrics.isEmpty()) {
+ return new DetectionPipelineResult(Collections.<MergedAnomalyResultDTO>emptyList(), -1);
+ }
+ if (nestedMetrics.size() > MAX_DIMENSION_COMBINATIONS) {
+ throw new DetectionPipelineException(String.format(
+ "Dimension combination for {} is {} which exceeds limit of {}",
+ this.config.getId(), nestedMetrics.size(), MAX_DIMENSION_COMBINATIONS));
+ }
+
List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
Map<String, Object> diagnostics = new HashMap<>();
Set<Long> lastTimeStamps = new HashSet<>();
+
long totalNestedMetrics = nestedMetrics.size();
long successNestedMetrics = 0; // record the number of successfully explored dimensions
- LOG.info("exploring {} metrics", totalNestedMetrics);
+ Exception lastException = null;
+ LOG.info("Run detection for {} metrics", totalNestedMetrics);
for (int i = 0; i < totalNestedMetrics; i++) {
checkEarlyStop(totalNestedMetrics, successNestedMetrics, i);
MetricEntity metric = nestedMetrics.get(i);
@@ -218,10 +244,11 @@ public class DimensionWrapper extends DetectionPipeline {
} catch (Exception e) {
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed for metric urn {}.",
this.config.getId(), this.start, this.end, metric.getUrn(), e);
+ lastException = e;
}
}
- checkDimensionExploreStatus(totalNestedMetrics, successNestedMetrics);
+ checkNestedMetricsStatus(totalNestedMetrics, successNestedMetrics, lastException);
return new DetectionPipelineResult(anomalies, DetectionUtils.consolidateNestedLastTimeStamps(lastTimeStamps))
.setDiagnostics(diagnostics);
}
@@ -230,18 +257,18 @@ public class DimensionWrapper extends DetectionPipeline {
// if the first certain number of dimensions all failed, throw an exception
if (i == EARLY_STOP_THRESHOLD && successNestedMetrics == 0) {
throw new DetectionPipelineException(String.format(
- "Detection failed for first %d out of %d metric dimensions for monitoring window %d to %d, stop dimension explore.",
+ "Detection failed for first %d out of %d metric dimensions for monitoring window %d to %d, stop processing.",
i, totalNestedMetrics, this.getStartTime(), this.getEndTime()));
}
}
- private void checkDimensionExploreStatus(long totalNestedMetrics, long successNestedMetrics)
+ private void checkNestedMetricsStatus(long totalNestedMetrics, long successNestedMetrics, Exception lastException)
throws DetectionPipelineException {
- // if all dimension explore failed, throw an exception
+ // if all nested metrics failed, throw an exception
if (successNestedMetrics == 0 && totalNestedMetrics > 0) {
throw new DetectionPipelineException(String.format(
- "Detection failed for all nested dimensions for detection config id %d for monitoring window %d to %d, stop dimension explore.",
- this.config.getId(), this.getStartTime(), this.getEndTime()));
+ "Detection failed for all nested dimensions for detection config id %d for monitoring window %d to %d.",
+ this.config.getId(), this.getStartTime(), this.getEndTime()), lastException);
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 3d36eb1..55eb92e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -50,6 +50,7 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
+import org.omg.SendingContext.RunTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,6 +161,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
int totalWindows = monitoringWindows.size();
int successWindows = 0;
+ // The last exception of the detection windows. It will be thrown out to upper level.
+ Exception lastException = null;
for (int i = 0; i < totalWindows; i++) {
checkEarlyStop(totalWindows, successWindows, i);
@@ -178,14 +181,16 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
}
catch (DetectorDataInsufficientException e) {
LOG.warn("[DetectionConfigID{}] Insufficient data ro run detection for window {} to {}.", this.config.getId(), window.getStart(), window.getEnd());
+ lastException = e;
}
catch (Exception e) {
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
+ lastException = e;
}
anomalies.addAll(anomaliesForOneWindow);
}
- checkMovingWindowDetectionStatus(totalWindows, successWindows);
+ checkMovingWindowDetectionStatus(totalWindows, successWindows, lastException);
for (MergedAnomalyResultDTO anomaly : anomalies) {
anomaly.setDetectionConfigId(this.config.getId());
@@ -209,12 +214,12 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
}
}
- private void checkMovingWindowDetectionStatus(int totalWindows, int successWindows) throws DetectionPipelineException {
+ private void checkMovingWindowDetectionStatus(int totalWindows, int successWindows, Exception lastException) throws DetectionPipelineException {
// if all moving window detection failed, throw an exception
if (successWindows == 0 && totalWindows > 0) {
throw new DetectionPipelineException(String.format(
"Detection failed for all windows for detection config id %d detector %s for monitoring window %d to %d.",
- this.config.getId(), this.detectorName, this.getStartTime(), this.getEndTime()));
+ this.config.getId(), this.detectorName, this.getStartTime(), this.getEndTime()), lastException);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org