You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/04/25 22:52:42 UTC

[incubator-pinot] branch master updated: [TE] add threshold for dimension explore (#4166)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d60f445  [TE] add threshold for dimension explore (#4166)
d60f445 is described below

commit d60f4454540ea71b42230c82829965c086433c44
Author: Xiaohui Sun <xh...@linkedin.com>
AuthorDate: Thu Apr 25 15:52:37 2019 -0700

    [TE] add threshold for dimension explore (#4166)
---
 .../detection/algorithm/DimensionWrapper.java      | 47 +++++++++++++++++-----
 .../detection/wrapper/AnomalyDetectorWrapper.java  | 11 +++--
 2 files changed, 45 insertions(+), 13 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
index 8950d52..51bdaf5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -70,6 +70,11 @@ public class DimensionWrapper extends DetectionPipeline {
   private static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
 
   private static final String PROP_CLASS_NAME = "className";
+
+  // Max number of dimension combinations we can handle.
+  private static final int MAX_DIMENSION_COMBINATIONS = 20000;
+
+  // Stop running if the first several dimension combinations all failed.
   private static final int EARLY_STOP_THRESHOLD = 10;
 
   private final String metricUrn;
@@ -126,8 +131,12 @@ public class DimensionWrapper extends DetectionPipeline {
     }
   }
 
-  @Override
-  public DetectionPipelineResult run() throws Exception {
+  /**
+   * Run Dimension explore and return explored metrics.
+   *
+   * @return List of metrics to process.
+   */
+  private List<MetricEntity> dimensionExplore() {
     List<MetricEntity> nestedMetrics = new ArrayList<>();
 
     if (this.metricUrn != null) {
@@ -140,7 +149,7 @@ public class DimensionWrapper extends DetectionPipeline {
       DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions).get(slice);
 
       if (aggregates.isEmpty()) {
-        return new DetectionPipelineResult(Collections.<MergedAnomalyResultDTO>emptyList(), -1);
+        return nestedMetrics;
       }
 
       final double total = aggregates.getDoubles(COL_VALUE).sum().fillNull().doubleValue();
@@ -197,12 +206,29 @@ public class DimensionWrapper extends DetectionPipeline {
       nestedMetrics = nestedMetrics.stream().filter(metricEntity -> checkMinLiveZone(metricEntity)).collect(Collectors.toList());
     }
 
+    return nestedMetrics;
+  }
+
+  @Override
+  public DetectionPipelineResult run() throws Exception {
+    List<MetricEntity> nestedMetrics = dimensionExplore();
+    if (nestedMetrics.isEmpty()) {
+      return new DetectionPipelineResult(Collections.<MergedAnomalyResultDTO>emptyList(), -1);
+    }
+    if (nestedMetrics.size() > MAX_DIMENSION_COMBINATIONS) {
+      throw new DetectionPipelineException(String.format(
+          "Dimension combination for {} is {} which exceeds limit of {}",
+          this.config.getId(), nestedMetrics.size(), MAX_DIMENSION_COMBINATIONS));
+    }
+
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
     Map<String, Object> diagnostics = new HashMap<>();
     Set<Long> lastTimeStamps = new HashSet<>();
+
     long totalNestedMetrics = nestedMetrics.size();
     long successNestedMetrics = 0; // record the number of successfully explored dimensions
-    LOG.info("exploring {} metrics", totalNestedMetrics);
+    Exception lastException = null;
+    LOG.info("Run detection for {} metrics", totalNestedMetrics);
     for (int i = 0; i < totalNestedMetrics; i++) {
       checkEarlyStop(totalNestedMetrics, successNestedMetrics, i);
       MetricEntity metric = nestedMetrics.get(i);
@@ -218,10 +244,11 @@ public class DimensionWrapper extends DetectionPipeline {
       } catch (Exception e) {
         LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed for metric urn {}.",
             this.config.getId(), this.start, this.end, metric.getUrn(), e);
+        lastException = e;
       }
     }
 
-    checkDimensionExploreStatus(totalNestedMetrics, successNestedMetrics);
+    checkNestedMetricsStatus(totalNestedMetrics, successNestedMetrics, lastException);
     return new DetectionPipelineResult(anomalies, DetectionUtils.consolidateNestedLastTimeStamps(lastTimeStamps))
         .setDiagnostics(diagnostics);
   }
@@ -230,18 +257,18 @@ public class DimensionWrapper extends DetectionPipeline {
     // if the first certain number of dimensions all failed, throw an exception
     if (i == EARLY_STOP_THRESHOLD && successNestedMetrics == 0) {
       throw new DetectionPipelineException(String.format(
-          "Detection failed for first %d out of %d metric dimensions for monitoring window %d to %d, stop dimension explore.",
+          "Detection failed for first %d out of %d metric dimensions for monitoring window %d to %d, stop processing.",
           i, totalNestedMetrics, this.getStartTime(), this.getEndTime()));
     }
   }
 
-  private void checkDimensionExploreStatus(long totalNestedMetrics, long successNestedMetrics)
+  private void checkNestedMetricsStatus(long totalNestedMetrics, long successNestedMetrics, Exception lastException)
       throws DetectionPipelineException {
-    // if all dimension explore failed, throw an exception
+    // if all nested metrics failed, throw an exception
     if (successNestedMetrics == 0 && totalNestedMetrics > 0) {
       throw new DetectionPipelineException(String.format(
-          "Detection failed for all nested dimensions for detection config id %d for monitoring window %d to %d, stop dimension explore.",
-          this.config.getId(), this.getStartTime(), this.getEndTime()));
+          "Detection failed for all nested dimensions for detection config id %d for monitoring window %d to %d.",
+          this.config.getId(), this.getStartTime(), this.getEndTime()), lastException);
     }
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 3d36eb1..55eb92e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -50,6 +50,7 @@ import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
 import org.joda.time.Period;
+import org.omg.SendingContext.RunTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -160,6 +161,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
     int totalWindows = monitoringWindows.size();
     int successWindows = 0;
+    // The last exception of the detection windows. It will be thrown out to upper level.
+    Exception lastException = null;
     for (int i = 0; i < totalWindows; i++) {
       checkEarlyStop(totalWindows, successWindows, i);
 
@@ -178,14 +181,16 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
       }
       catch (DetectorDataInsufficientException e) {
         LOG.warn("[DetectionConfigID{}] Insufficient data ro run detection for window {} to {}.", this.config.getId(), window.getStart(), window.getEnd());
+        lastException = e;
       }
       catch (Exception e) {
         LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
+        lastException = e;
       }
       anomalies.addAll(anomaliesForOneWindow);
     }
 
-    checkMovingWindowDetectionStatus(totalWindows, successWindows);
+    checkMovingWindowDetectionStatus(totalWindows, successWindows, lastException);
 
     for (MergedAnomalyResultDTO anomaly : anomalies) {
       anomaly.setDetectionConfigId(this.config.getId());
@@ -209,12 +214,12 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     }
   }
 
-  private void checkMovingWindowDetectionStatus(int totalWindows, int successWindows) throws DetectionPipelineException {
+  private void checkMovingWindowDetectionStatus(int totalWindows, int successWindows, Exception lastException) throws DetectionPipelineException {
     // if all moving window detection failed, throw an exception
     if (successWindows == 0 && totalWindows > 0) {
       throw new DetectionPipelineException(String.format(
           "Detection failed for all windows for detection config id %d detector %s for monitoring window %d to %d.",
-          this.config.getId(), this.detectorName, this.getStartTime(), this.getEndTime()));
+          this.config.getId(), this.detectorName, this.getStartTime(), this.getEndTime()), lastException);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org