You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/03/22 23:03:19 UTC

[incubator-pinot] branch master updated: [TE] Fix ArrayIndexOutOfBoundsException; Wrap replay endpoint and ret… (#4003)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d58f8bc  [TE] Fix ArrayIndexOutOfBoundsException; Wrap replay endpoint and ret… (#4003)
d58f8bc is described below

commit d58f8bce4b59de096b4ee9fee61c679482dd1d7d
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Fri Mar 22 16:03:14 2019 -0700

    [TE] Fix ArrayIndexOutOfBoundsException; Wrap replay endpoint and ret… (#4003)
---
 .../thirdeye/detection/DetectionResource.java      | 136 ++++++++++++---------
 .../PercentageChangeRuleAnomalyFilter.java         |  24 +++-
 2 files changed, 97 insertions(+), 63 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
index 1504640..38fad77 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -338,7 +339,6 @@ public class DetectionResource {
    * @param windowSize (optional) override the default window size
    * @param bucketSize (optional) override the default window size
    * @return anomalies
-   * @throws Exception
    */
   @POST
   @Path("/legacy-replay/{id}")
@@ -350,47 +350,55 @@ public class DetectionResource {
       @QueryParam("end") long end,
       @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly,
       @QueryParam("windowSize") Long windowSize,
-      @QueryParam("bucketSize") Long bucketSize) throws Exception {
-
-    DetectionConfigDTO config = this.configDAO.findById(configId);
-    if (config == null) {
-      throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
-    }
-
-    AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
-    if (deleteExistingAnomaly) {
-      // clear existing anomalies
-      Collection<MergedAnomalyResultDTO> existing =
-          this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
-
-      List<Long> existingIds = new ArrayList<>();
-      for (MergedAnomalyResultDTO anomaly : existing) {
-        existingIds.add(anomaly.getId());
+      @QueryParam("bucketSize") Long bucketSize) {
+    Map<String, String> responseMessage = new HashMap<>();
+    Collection<MergedAnomalyResultDTO> replayResult;
+    try {
+      DetectionConfigDTO config = this.configDAO.findById(configId);
+      if (config == null) {
+        throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
       }
-      this.anomalyDAO.deleteByIds(existingIds);
-    }
-
-    // execute replay
-    List<Interval> monitoringWindows = getReplayMonitoringWindows(config, start, end, windowSize, bucketSize);
-    for (Interval monitoringWindow : monitoringWindows){
-      DetectionPipeline pipeline = this.loader.from(this.provider, config, monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
-      DetectionPipelineResult result = pipeline.run();
 
-      // Update
-      if (result.getLastTimestamp() > config.getLastTimestamp()) {
-        config.setLastTimestamp(result.getLastTimestamp());
-        this.configDAO.update(config);
+      AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+      if (deleteExistingAnomaly) {
+        // clear existing anomalies
+        Collection<MergedAnomalyResultDTO> existing =
+            this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+        List<Long> existingIds = new ArrayList<>();
+        for (MergedAnomalyResultDTO anomaly : existing) {
+          existingIds.add(anomaly.getId());
+        }
+        this.anomalyDAO.deleteByIds(existingIds);
       }
 
-      for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
-        anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
-        this.anomalyDAO.save(anomaly);
+      // execute replay
+      List<Interval> monitoringWindows = getReplayMonitoringWindows(config, start, end, windowSize, bucketSize);
+      for (Interval monitoringWindow : monitoringWindows){
+        DetectionPipeline pipeline = this.loader.from(this.provider, config, monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
+        DetectionPipelineResult result = pipeline.run();
+
+        // Update
+        if (result.getLastTimestamp() > config.getLastTimestamp()) {
+          config.setLastTimestamp(result.getLastTimestamp());
+          this.configDAO.update(config);
+        }
+
+        for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+          anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
+          this.anomalyDAO.save(anomaly);
+        }
       }
-    }
 
-    Collection<MergedAnomalyResultDTO> replayResult = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+      replayResult = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+    } catch (Exception e) {
+      LOG.error("Error running replay on detection id " + configId, e);
+      responseMessage.put("message", "Failed to run the replay due to " + e.getMessage());
+      return Response.serverError().entity(responseMessage).build();
+    }
 
-    LOG.info("replay detection pipeline {} generated {} anomalies.", config.getId(), replayResult.size());
+    LOG.info("Replay detection pipeline {} generated {} anomalies.", configId, replayResult.size());
     return Response.ok(replayResult).build();
   }
 
@@ -410,40 +418,48 @@ public class DetectionResource {
       @QueryParam("start") long start,
       @QueryParam("end") long end,
       @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly) throws Exception {
+    Map<String, String> responseMessage = new HashMap<>();
+    DetectionPipelineResult result;
+    try {
+      DetectionConfigDTO config = this.configDAO.findById(detectionId);
+      if (config == null) {
+        throw new IllegalArgumentException(String.format("Cannot find config %d", detectionId));
+      }
 
-    DetectionConfigDTO config = this.configDAO.findById(detectionId);
-    if (config == null) {
-      throw new IllegalArgumentException(String.format("Cannot find config %d", detectionId));
-    }
+      if (deleteExistingAnomaly) {
+        AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+        Collection<MergedAnomalyResultDTO> existing =
+            this.provider.fetchAnomalies(Collections.singleton(slice), detectionId).get(slice);
 
-    if (deleteExistingAnomaly) {
-      AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
-      Collection<MergedAnomalyResultDTO> existing =
-          this.provider.fetchAnomalies(Collections.singleton(slice), detectionId).get(slice);
+        List<Long> existingIds = new ArrayList<>();
+        for (MergedAnomalyResultDTO anomaly : existing) {
+          existingIds.add(anomaly.getId());
+        }
 
-      List<Long> existingIds = new ArrayList<>();
-      for (MergedAnomalyResultDTO anomaly : existing) {
-        existingIds.add(anomaly.getId());
+        this.anomalyDAO.deleteByIds(existingIds);
       }
 
-      this.anomalyDAO.deleteByIds(existingIds);
-    }
+      // execute replay
+      DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
+      result = pipeline.run();
 
-    // execute replay
-    DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
-    DetectionPipelineResult result = pipeline.run();
-
-    // Update state
-    if (result.getLastTimestamp() > config.getLastTimestamp()) {
-      config.setLastTimestamp(result.getLastTimestamp());
-      this.configDAO.update(config);
-    }
+      // Update state
+      if (result.getLastTimestamp() > config.getLastTimestamp()) {
+        config.setLastTimestamp(result.getLastTimestamp());
+        this.configDAO.update(config);
+      }
 
-    for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
-      anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
-      this.anomalyDAO.save(anomaly);
+      for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+        anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
+        this.anomalyDAO.save(anomaly);
+      }
+    } catch (Exception e) {
+      LOG.error("Error running replay on detection id " + detectionId, e);
+      responseMessage.put("message", "Failed to run the replay due to " + e.getMessage());
+      return Response.serverError().entity(responseMessage).build();
     }
 
+    LOG.info("Replay detection pipeline {} generated {} anomalies.", detectionId, result.anomalies.size());
     return Response.ok(result).build();
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java
index ac7ae13..a44e626 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java
@@ -36,6 +36,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
 
@@ -45,6 +47,7 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
  */
 @Components(type = "PERCENTAGE_CHANGE_FILTER", tags = {DetectionTag.RULE_FILTER})
 public class PercentageChangeRuleAnomalyFilter implements AnomalyFilter<PercentageChangeRuleAnomalyFilterSpec> {
+  private static final Logger LOG = LoggerFactory.getLogger(PercentageChangeRuleAnomalyFilter.class);
   private double threshold;
   private InputDataFetcher dataFetcher;
   private Baseline baseline;
@@ -68,9 +71,24 @@ public class PercentageChangeRuleAnomalyFilter implements AnomalyFilter<Percenta
     Map<MetricSlice, DataFrame> aggregates =
         this.dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(slices)).getAggregates();
 
-    double currentValue = getValueFromAggregates(currentSlice, aggregates);
-    double baselineValue =
-        baselineSlice == null ? anomaly.getAvgBaselineVal() : getValueFromAggregates(baselineSlice, aggregates);
+    double currentValue;
+    if (aggregates.get(currentSlice).isEmpty()) {
+      currentValue = anomaly.getAvgCurrentVal();
+    } else {
+      currentValue = getValueFromAggregates(currentSlice, aggregates);
+    }
+
+    double baselineValue;
+    if (baselineSlice == null) {
+      baselineValue = anomaly.getAvgBaselineVal();
+    } else if (aggregates.get(baselineSlice).isEmpty()) {
+      baselineValue = anomaly.getAvgBaselineVal();
+      LOG.warn("Unable to fetch data for baseline slice for anomaly {}. start = {} end = {} filters = {}. Using anomaly"
+              + " baseline ", anomaly.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
+    } else {
+      baselineValue = getValueFromAggregates(baselineSlice, aggregates);
+    }
+
     // if inconsistent with up/down, filter the anomaly
     if (!pattern.equals(Pattern.UP_OR_DOWN) && (currentValue < baselineValue && pattern.equals(Pattern.UP)) || (
         currentValue > baselineValue && pattern.equals(Pattern.DOWN))) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org