You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/03/22 23:03:19 UTC
[incubator-pinot] branch master updated: [TE] Fix ArrayIndexOutOfBoundsException; Wrap replay endpoint and ret… (#4003)
This is an automated email from the ASF dual-hosted git repository.
akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d58f8bc [TE] Fix ArrayIndexOutOfBoundsException; Wrap replay endpoint and ret… (#4003)
d58f8bc is described below
commit d58f8bce4b59de096b4ee9fee61c679482dd1d7d
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Fri Mar 22 16:03:14 2019 -0700
[TE] Fix ArrayIndexOutOfBoundsException; Wrap replay endpoint and ret… (#4003)
---
.../thirdeye/detection/DetectionResource.java | 136 ++++++++++++---------
.../PercentageChangeRuleAnomalyFilter.java | 24 +++-
2 files changed, 97 insertions(+), 63 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
index 1504640..38fad77 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -338,7 +339,6 @@ public class DetectionResource {
* @param windowSize (optional) override the default window size
* @param bucketSize (optional) override the default window size
* @return anomalies
- * @throws Exception
*/
@POST
@Path("/legacy-replay/{id}")
@@ -350,47 +350,55 @@ public class DetectionResource {
@QueryParam("end") long end,
@QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly,
@QueryParam("windowSize") Long windowSize,
- @QueryParam("bucketSize") Long bucketSize) throws Exception {
-
- DetectionConfigDTO config = this.configDAO.findById(configId);
- if (config == null) {
- throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
- }
-
- AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
- if (deleteExistingAnomaly) {
- // clear existing anomalies
- Collection<MergedAnomalyResultDTO> existing =
- this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
-
- List<Long> existingIds = new ArrayList<>();
- for (MergedAnomalyResultDTO anomaly : existing) {
- existingIds.add(anomaly.getId());
+ @QueryParam("bucketSize") Long bucketSize) {
+ Map<String, String> responseMessage = new HashMap<>();
+ Collection<MergedAnomalyResultDTO> replayResult;
+ try {
+ DetectionConfigDTO config = this.configDAO.findById(configId);
+ if (config == null) {
+ throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
}
- this.anomalyDAO.deleteByIds(existingIds);
- }
-
- // execute replay
- List<Interval> monitoringWindows = getReplayMonitoringWindows(config, start, end, windowSize, bucketSize);
- for (Interval monitoringWindow : monitoringWindows){
- DetectionPipeline pipeline = this.loader.from(this.provider, config, monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
- DetectionPipelineResult result = pipeline.run();
- // Update
- if (result.getLastTimestamp() > config.getLastTimestamp()) {
- config.setLastTimestamp(result.getLastTimestamp());
- this.configDAO.update(config);
+ AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+ if (deleteExistingAnomaly) {
+ // clear existing anomalies
+ Collection<MergedAnomalyResultDTO> existing =
+ this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+ List<Long> existingIds = new ArrayList<>();
+ for (MergedAnomalyResultDTO anomaly : existing) {
+ existingIds.add(anomaly.getId());
+ }
+ this.anomalyDAO.deleteByIds(existingIds);
}
- for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
- anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
- this.anomalyDAO.save(anomaly);
+ // execute replay
+ List<Interval> monitoringWindows = getReplayMonitoringWindows(config, start, end, windowSize, bucketSize);
+ for (Interval monitoringWindow : monitoringWindows){
+ DetectionPipeline pipeline = this.loader.from(this.provider, config, monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
+ DetectionPipelineResult result = pipeline.run();
+
+ // Update
+ if (result.getLastTimestamp() > config.getLastTimestamp()) {
+ config.setLastTimestamp(result.getLastTimestamp());
+ this.configDAO.update(config);
+ }
+
+ for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+ anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
+ this.anomalyDAO.save(anomaly);
+ }
}
- }
- Collection<MergedAnomalyResultDTO> replayResult = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+ replayResult = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+ } catch (Exception e) {
+ LOG.error("Error running replay on detection id " + configId, e);
+ responseMessage.put("message", "Failed to run the replay due to " + e.getMessage());
+ return Response.serverError().entity(responseMessage).build();
+ }
- LOG.info("replay detection pipeline {} generated {} anomalies.", config.getId(), replayResult.size());
+ LOG.info("Replay detection pipeline {} generated {} anomalies.", configId, replayResult.size());
return Response.ok(replayResult).build();
}
@@ -410,40 +418,48 @@ public class DetectionResource {
@QueryParam("start") long start,
@QueryParam("end") long end,
@QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly) throws Exception {
+ Map<String, String> responseMessage = new HashMap<>();
+ DetectionPipelineResult result;
+ try {
+ DetectionConfigDTO config = this.configDAO.findById(detectionId);
+ if (config == null) {
+ throw new IllegalArgumentException(String.format("Cannot find config %d", detectionId));
+ }
- DetectionConfigDTO config = this.configDAO.findById(detectionId);
- if (config == null) {
- throw new IllegalArgumentException(String.format("Cannot find config %d", detectionId));
- }
+ if (deleteExistingAnomaly) {
+ AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+ Collection<MergedAnomalyResultDTO> existing =
+ this.provider.fetchAnomalies(Collections.singleton(slice), detectionId).get(slice);
- if (deleteExistingAnomaly) {
- AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
- Collection<MergedAnomalyResultDTO> existing =
- this.provider.fetchAnomalies(Collections.singleton(slice), detectionId).get(slice);
+ List<Long> existingIds = new ArrayList<>();
+ for (MergedAnomalyResultDTO anomaly : existing) {
+ existingIds.add(anomaly.getId());
+ }
- List<Long> existingIds = new ArrayList<>();
- for (MergedAnomalyResultDTO anomaly : existing) {
- existingIds.add(anomaly.getId());
+ this.anomalyDAO.deleteByIds(existingIds);
}
- this.anomalyDAO.deleteByIds(existingIds);
- }
+ // execute replay
+ DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
+ result = pipeline.run();
- // execute replay
- DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
- DetectionPipelineResult result = pipeline.run();
-
- // Update state
- if (result.getLastTimestamp() > config.getLastTimestamp()) {
- config.setLastTimestamp(result.getLastTimestamp());
- this.configDAO.update(config);
- }
+ // Update state
+ if (result.getLastTimestamp() > config.getLastTimestamp()) {
+ config.setLastTimestamp(result.getLastTimestamp());
+ this.configDAO.update(config);
+ }
- for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
- anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
- this.anomalyDAO.save(anomaly);
+ for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+ anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
+ this.anomalyDAO.save(anomaly);
+ }
+ } catch (Exception e) {
+ LOG.error("Error running replay on detection id " + detectionId, e);
+ responseMessage.put("message", "Failed to run the replay due to " + e.getMessage());
+ return Response.serverError().entity(responseMessage).build();
}
+ LOG.info("Replay detection pipeline {} generated {} anomalies.", detectionId, result.anomalies.size());
return Response.ok(result).build();
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java
index ac7ae13..a44e626 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java
@@ -36,6 +36,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
@@ -45,6 +47,7 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
*/
@Components(type = "PERCENTAGE_CHANGE_FILTER", tags = {DetectionTag.RULE_FILTER})
public class PercentageChangeRuleAnomalyFilter implements AnomalyFilter<PercentageChangeRuleAnomalyFilterSpec> {
+ private static final Logger LOG = LoggerFactory.getLogger(PercentageChangeRuleAnomalyFilter.class);
private double threshold;
private InputDataFetcher dataFetcher;
private Baseline baseline;
@@ -68,9 +71,24 @@ public class PercentageChangeRuleAnomalyFilter implements AnomalyFilter<Percenta
Map<MetricSlice, DataFrame> aggregates =
this.dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(slices)).getAggregates();
- double currentValue = getValueFromAggregates(currentSlice, aggregates);
- double baselineValue =
- baselineSlice == null ? anomaly.getAvgBaselineVal() : getValueFromAggregates(baselineSlice, aggregates);
+ double currentValue;
+ if (aggregates.get(currentSlice).isEmpty()) {
+ currentValue = anomaly.getAvgCurrentVal();
+ } else {
+ currentValue = getValueFromAggregates(currentSlice, aggregates);
+ }
+
+ double baselineValue;
+ if (baselineSlice == null) {
+ baselineValue = anomaly.getAvgBaselineVal();
+ } else if (aggregates.get(baselineSlice).isEmpty()) {
+ baselineValue = anomaly.getAvgBaselineVal();
+ LOG.warn("Unable to fetch data for baseline slice for anomaly {}. start = {} end = {} filters = {}. Using anomaly"
+ + " baseline ", anomaly.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
+ } else {
+ baselineValue = getValueFromAggregates(baselineSlice, aggregates);
+ }
+
// if inconsistent with up/down, filter the anomaly
if (!pattern.equals(Pattern.UP_OR_DOWN) && (currentValue < baselineValue && pattern.equals(Pattern.UP)) || (
currentValue > baselineValue && pattern.equals(Pattern.DOWN))) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org