You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2018/12/06 18:18:34 UTC
[incubator-pinot] branch master updated: [TE] detection pipeline -
multiple improvements (#3586)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new dcdf5d4 [TE] detection pipeline - multiple improvements (#3586)
dcdf5d4 is described below
commit dcdf5d4f92e2a910c8c118151c4a28af16a0a40a
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Thu Dec 6 10:18:28 2018 -0800
[TE] detection pipeline - multiple improvements (#3586)
- Change the pipeline baseline and predicted values filling so that it's consistent with RCA
- Replay moving window
- Replay endpoint for the new pipeline with legacy behaviors
- Add window size, window unit, WoW rule, min-max threshold migration to YAML migration endpoint
- Baseline filling merger handles anomalies per detector per dimension
- Baseline filling merger injects detector to the next level detector wrapper
- Other fixes
- Tests
---
.../anomaly/detection/DetectionTaskRunner.java | 2 +-
.../bao/jdbc/MergedAnomalyResultManagerImpl.java | 3 +
.../detection/DetectionMigrationResource.java | 56 +++++++-
.../thirdeye/detection/DetectionResource.java | 159 ++++++++++++++++++++-
.../thirdeye/detection/DetectionUtils.java | 4 +-
.../thirdeye/detection/algorithm/MergeWrapper.java | 15 +-
.../detection/wrapper/AnomalyDetectorWrapper.java | 59 ++++++--
.../wrapper/BaselineFillingMergeWrapper.java | 72 ++++++++--
.../wrapper/ChildKeepingMergeWrapper.java | 4 +-
.../yaml/CompositePipelineConfigTranslator.java | 6 +-
.../wrapper/AnomalyDetectorWrapperTest.java | 16 ++-
.../wrapper/BaselineFillingMergeWrapperTest.java | 10 +-
.../compositePipelineTranslatorTestResult-1.json | 8 +-
.../compositePipelineTranslatorTestResult-2.json | 6 +-
14 files changed, 368 insertions(+), 52 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
index 57d7fb5..30be06f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
@@ -259,7 +259,7 @@ public class DetectionTaskRunner implements TaskRunner {
historyMergedAnomalies = Collections.emptyList();
}
- LOG.info("Analyzing anomaly function with explored dimensions: {}, windowStart: {}, windowEnd: {}",
+ LOG.info("[Old pipeline] Analyzing anomaly function with explored dimensions: {}, windowStart: {}, windowEnd: {}",
dimensionMap, windowStart, windowEnd);
AnomalyUtils.logAnomaliesOverlapWithWindow(windowStart, windowEnd, historyMergedAnomalies);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
index 42cbc62..aad718a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
@@ -405,6 +405,9 @@ public class MergedAnomalyResultManagerImpl extends AbstractManagerImpl<MergedAn
for (MergedAnomalyResultDTO child : entity.getChildren()) {
if (child.getId() == null) {
// only allow single level to prevent cycles
+ if (child == entity){
+ throw new IllegalArgumentException("Cannot contain itself as child anomaly");
+ }
if (child.getChildren() != null && !child.getChildren().isEmpty()) {
throw new IllegalArgumentException("Multi-level anomaly nesting not supported");
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index d4035aa..c53af49 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -28,6 +28,7 @@ import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -59,6 +60,8 @@ public class DetectionMigrationResource {
private static final Logger LOGGER = LoggerFactory.getLogger(DetectionMigrationResource.class);
private static final String PROP_WINDOW_DELAY = "windowDelay";
private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
+ private static final String PROP_WINDOW_SIZE = "windowSize";
+ private static final String PROP_WINDOW_UNIT = "windowUnit";
private final LegacyAnomalyFunctionTranslator translator;
private final AnomalyFunctionManager anomalyFunctionDAO;
@@ -115,8 +118,21 @@ public class DetectionMigrationResource {
ruleYaml.put("name", "myRule");
// detection
- ruleYaml.put("detection", Collections.singletonList(
- ImmutableMap.of("type", "ALGORITHM", "params", getAlgorithmDetectorParams(anomalyFunctionDTO))));
+ if (anomalyFunctionDTO.getType().equals("WEEK_OVER_WEEK_RULE")){
+ // wo1w change detector
+ ruleYaml.put("detection", Collections.singletonList(ImmutableMap.of("type", "PERCENTAGE_RULE",
+ "params", getPercentageChangeRuleDetectorParams(anomalyFunctionDTO))));
+ } else if (anomalyFunctionDTO.getType().equals("MIN_MAX_THRESHOLD")){
+ // threshold detector
+ ruleYaml.put("detection", Collections.singletonList(ImmutableMap.of("type", "THRESHOLD",
+ "params", getMinMaxThresholdRuleDetectorParams(anomalyFunctionDTO))));
+ } else{
+ // algorithm detector
+ ruleYaml.put("detection", Collections.singletonList(
+ ImmutableMap.of("type", "ALGORITHM", "params", getAlgorithmDetectorParams(anomalyFunctionDTO),
+ PROP_WINDOW_SIZE, anomalyFunctionDTO.getWindowSize(),
+ PROP_WINDOW_UNIT, anomalyFunctionDTO.getWindowUnit().toString())));
+ }
// filters
Map<String, String> alertFilter = anomalyFunctionDTO.getAlertFilter();
@@ -157,7 +173,7 @@ public class DetectionMigrationResource {
return this.yaml.dump(yamlConfigs);
}
- private Map<String, Object> getDimensionExplorationParams(AnomalyFunctionDTO functionDTO) {
+ private Map<String, Object> getDimensionExplorationParams(AnomalyFunctionDTO functionDTO) throws IOException {
Map<String, Object> dimensionExploreYaml = new LinkedHashMap<>();
dimensionExploreYaml.put("dimensions", Collections.singletonList(functionDTO.getExploreDimensions()));
if (functionDTO.getDataFilter() != null && !functionDTO.getDataFilter().isEmpty() && functionDTO.getDataFilter().get("type").equals("average_threshold")) {
@@ -166,6 +182,13 @@ public class DetectionMigrationResource {
dimensionExploreYaml.put("minValue", Double.valueOf(functionDTO.getDataFilter().get("threshold")));
dimensionExploreYaml.put("minLiveZone", functionDTO.getDataFilter().get("minLiveZone"));
}
+ if (functionDTO.getType().equals("MIN_MAX_THRESHOLD")){
+ // migrate volume threshold
+ Properties properties = AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+ if (properties.containsKey("averageVolumeThreshold")){
+ dimensionExploreYaml.put("minValue", properties.getProperty("averageVolumeThreshold"));
+ }
+ }
return dimensionExploreYaml;
}
@@ -208,6 +231,33 @@ public class DetectionMigrationResource {
return new Period(TimeUnit.MILLISECONDS.convert(functionDTO.getBucketSize(), functionDTO.getBucketUnit())).toString();
}
+ private Map<String, Object> getPercentageChangeRuleDetectorParams(AnomalyFunctionDTO functionDTO) throws IOException {
+ Map<String, Object> detectorYaml = new LinkedHashMap<>();
+ Properties properties = AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+ double threshold = Double.valueOf(properties.getProperty("changeThreshold"));
+ if (properties.containsKey("changeThreshold")){
+ detectorYaml.put("percentageChange", Math.abs(threshold));
+ if (threshold > 0){
+ detectorYaml.put("pattern", "UP");
+ } else {
+ detectorYaml.put("pattern", "DOWN");
+ }
+ }
+ return detectorYaml;
+ }
+
+ private Map<String, Object> getMinMaxThresholdRuleDetectorParams(AnomalyFunctionDTO functionDTO) throws IOException {
+ Map<String, Object> detectorYaml = new LinkedHashMap<>();
+ Properties properties = AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+ if (properties.containsKey("min")){
+ detectorYaml.put("min", properties.getProperty("min"));
+ }
+ if (properties.containsKey("max")){
+ detectorYaml.put("max", properties.getProperty("max"));
+ }
+ return detectorYaml;
+ }
+
private Map<String, Object> getAlgorithmDetectorParams(AnomalyFunctionDTO functionDTO) throws Exception {
Map<String, Object> detectorYaml = new LinkedHashMap<>();
Map<String, Object> params = new LinkedHashMap<>();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index a51a3a6..dd63d06 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -17,6 +17,7 @@
package com.linkedin.thirdeye.detection;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
import com.linkedin.thirdeye.constant.AnomalyResultSource;
import com.linkedin.thirdeye.datalayer.bao.DatasetConfigManager;
import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
@@ -35,9 +36,11 @@ import com.linkedin.thirdeye.detection.finetune.GridSearchTuningAlgorithm;
import com.linkedin.thirdeye.detection.finetune.TuningAlgorithm;
import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
import com.wordnik.swagger.annotations.ApiParam;
+import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -51,7 +54,9 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.collections.MapUtils;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
+import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +66,9 @@ import org.slf4j.LoggerFactory;
public class DetectionResource {
private static final Logger LOG = LoggerFactory.getLogger(DetectionResource.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String DAILY_CRON = "0 0 14 * * ? *";
+ private static final String HOURLY_CRON = "0 0 * * * ? *";
+ private static final String MINUTE_CRON = "0 0/15 * * * ? *";
private final MetricConfigManager metricDAO;
private final DatasetConfigManager datasetDAO;
@@ -164,11 +172,102 @@ public class DetectionResource {
return Response.ok(result).build();
}
+ // return default bucket size based on cron schedule.
+ private long getBucketSize(DetectionConfigDTO config){
+ switch (config.getCron()) {
+ case DAILY_CRON:
+ // daily
+ return TimeUnit.DAYS.toMillis(1);
+ case MINUTE_CRON:
+ // minute level
+ return TimeUnit.MINUTES.toMillis(15);
+ case HOURLY_CRON:
+ // hourly
+ return TimeUnit.HOURS.toMillis(1);
+ }
+ throw new IllegalArgumentException("bucket size not determined");
+ }
+
+ // return default window size based on cron schedule.
+ private long getWindowSize(DetectionConfigDTO config) {
+ switch (config.getCron()) {
+ case DAILY_CRON:
+ // daily
+ return TimeUnit.DAYS.toMillis(1);
+ case MINUTE_CRON:
+ // minute level
+ return TimeUnit.HOURS.toMillis(6);
+ case HOURLY_CRON:
+ // hourly
+ return TimeUnit.HOURS.toMillis(24);
+ }
+ throw new IllegalArgumentException("window size not determined");
+ }
+
+ /*
+ Generates monitoring window based on cron schedule.
+ */
+ private List<Interval> getReplayMonitoringWindows(DetectionConfigDTO config, long start, long end, Long windowSize, Long bucketSize) throws ParseException {
+ List<Interval> monitoringWindows = new ArrayList<>();
+ CronExpression cronExpression = new CronExpression(config.getCron());
+ DateTime currentStart = new DateTime(start);
+
+ long legacyWindowSize;
+ if(windowSize == null){
+ legacyWindowSize = getWindowSize(config);
+ LOG.warn("[Legacy replay] window size not set when replay {}. Use default window size {}", config.getId(), legacyWindowSize);
+ } else {
+ legacyWindowSize = windowSize;
+ }
+
+ long legacyBucketSize;
+ if (bucketSize == null){
+ legacyBucketSize = getBucketSize(config);
+ LOG.warn("[Legacy replay] bucket size not set when replay {}. Use default bucket size {}", config.getId(), legacyBucketSize);
+ } else {
+ legacyBucketSize = bucketSize;
+ }
+
+ // add offsets to that it would replay all the moving windows within start time and end time
+ currentStart = currentStart.plus(legacyWindowSize).minus(legacyBucketSize);
+ if (config.getCron().equals(DAILY_CRON)){
+ // daily detection offset by 1 to pick up the first moving window
+ currentStart.minus(1L);
+ }
+
+ DateTime currentEnd = new DateTime(cronExpression.getNextValidTimeAfter(currentStart.toDate()));
+ DateTime endBoundary = new DateTime(cronExpression.getNextValidTimeAfter(new Date(end)));
+ while (currentEnd.isBefore(endBoundary)) {
+ monitoringWindows.add(new Interval(currentStart.getMillis(), currentEnd.getMillis()));
+ currentStart = currentEnd;
+ currentEnd = new DateTime(cronExpression.getNextValidTimeAfter(currentStart.toDate()));
+ }
+ return monitoringWindows;
+ }
+
+ /**
+ * Legacy replay endpoint. Replay all the moving windows within start time and end time.
+ * Saves anomaly for each moving window before starting detection for next window.
+ * Behaves exactly like the legacy replay endpoint.
+ * See also {@link com.linkedin.thirdeye.dashboard.resources.DetectionJobResource#generateAnomaliesInRangeForFunctions(String, String, String, String, Boolean, Boolean)}}
+ * @param configId the config id to replay
+ * @param start start time
+ * @param end end time
+ * @param deleteExistingAnomaly (optional) delete existing anomaly or not
+ * @param windowSize (optional) override the default window size
+ * @param bucketSize (optional) override the default window size
+ * @return anomalies
+ * @throws Exception
+ */
@POST
- @Path("/replay/{id}")
- public Response detectionReplay(@PathParam("id") long configId, @QueryParam("start") long start,
+ @Path("/legacy-replay/{id}")
+ public Response legacyReplay(
+ @PathParam("id") long configId,
+ @QueryParam("start") long start,
@QueryParam("end") long end,
- @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly) throws Exception {
+ @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly,
+ @QueryParam("windowSize") Long windowSize,
+ @QueryParam("bucketSize") Long bucketSize) throws Exception {
DetectionConfigDTO config = this.configDAO.findById(configId);
if (config == null) {
@@ -189,6 +288,57 @@ public class DetectionResource {
}
// execute replay
+ List<Interval> monitoringWindows = getReplayMonitoringWindows(config, start, end, windowSize, bucketSize);
+ for (Interval monitoringWindow : monitoringWindows){
+ DetectionPipeline pipeline = this.loader.from(this.provider, config, monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
+ DetectionPipelineResult result = pipeline.run();
+
+ // save state
+ if (result.getLastTimestamp() > 0) {
+ config.setLastTimestamp(result.getLastTimestamp());
+ }
+
+ this.configDAO.update(config);
+
+ for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+ anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
+ this.anomalyDAO.save(anomaly);
+ }
+ }
+
+ Collection<MergedAnomalyResultDTO> replayResult = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+ LOG.info("replay detection pipeline {} generated {} anomalies.", config.getId(), replayResult.size());
+ return Response.ok(replayResult).build();
+ }
+
+ /**
+ * Replay for a given time range. Without cron schedule behavior
+ */
+ @POST
+ @Path("/replay/{id}")
+ public Response detectionReplay(
+ @PathParam("id") long configId,
+ @QueryParam("start") long start,
+ @QueryParam("end") long end) throws Exception {
+
+ DetectionConfigDTO config = this.configDAO.findById(configId);
+ if (config == null) {
+ throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
+ }
+
+ // clear existing anomalies
+ AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+ Collection<MergedAnomalyResultDTO> existing = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+ List<Long> existingIds = new ArrayList<>();
+ for (MergedAnomalyResultDTO anomaly : existing) {
+ existingIds.add(anomaly.getId());
+ }
+
+ this.anomalyDAO.deleteByIds(existingIds);
+
+ // execute replay
DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
DetectionPipelineResult result = pipeline.run();
@@ -204,7 +354,6 @@ public class DetectionResource {
this.anomalyDAO.save(anomaly);
}
- LOG.info("replay detection pipeline {} generated {} anomalies.", config.getId(), result.getAnomalies().size());
- return Response.ok(result.getAnomalies()).build();
+ return Response.ok(result).build();
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
index f1bf20d..5f80b10 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
@@ -65,8 +65,10 @@ public class DetectionUtils {
}
// get the component name from the reference key
+ // example "$myRule:ALGORITHM:0" -> "myRule:ALGORITHM:0"
public static String getComponentName(String key) {
- return key.substring(1);
+ if (isReferenceName(key)) return key.substring(1);
+ else throw new IllegalArgumentException("not a component reference key. should starts with $");
}
// get the spec class name for a component class
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
index 43f9c27..19b13fb 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
@@ -46,7 +46,7 @@ public class MergeWrapper extends DetectionPipeline {
private static final String PROP_NESTED = "nested";
private static final String PROP_CLASS_NAME = "className";
private static final String PROP_MERGE_KEY = "mergeKey";
- private static final String PROP_DETECTOR_COMPONENT_KEY = "detectorComponentKey";
+ private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new Comparator<MergedAnomalyResultDTO>() {
@Override
@@ -67,7 +67,7 @@ public class MergeWrapper extends DetectionPipeline {
protected final List<Map<String, Object>> nestedProperties;
protected final long maxGap; // max time gap for merge
protected final long maxDuration; // max overall duration of merged anomaly
- private final AnomalySlice slice;
+ protected final AnomalySlice slice;
/**
* Instantiates a new merge wrapper.
@@ -181,7 +181,7 @@ public class MergeWrapper extends DetectionPipeline {
return output;
}
- private long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) {
+ protected long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) {
long time = this.startTime;
for (MergedAnomalyResultDTO anomaly : anomalies) {
time = Math.min(anomaly.getStartTime(), time);
@@ -189,7 +189,7 @@ public class MergeWrapper extends DetectionPipeline {
return time;
}
- private long getEndTime(Iterable<MergedAnomalyResultDTO> anomalies) {
+ protected long getEndTime(Iterable<MergedAnomalyResultDTO> anomalies) {
long time = this.endTime;
for (MergedAnomalyResultDTO anomaly : anomalies) {
time = Math.max(anomaly.getEndTime(), time);
@@ -214,7 +214,7 @@ public class MergeWrapper extends DetectionPipeline {
public static AnomalyKey from(MergedAnomalyResultDTO anomaly) {
return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY),
- anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_KEY));
+ anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_NAME));
}
@Override
@@ -227,12 +227,13 @@ public class MergeWrapper extends DetectionPipeline {
}
AnomalyKey that = (AnomalyKey) o;
return Objects.equals(metric, that.metric) && Objects.equals(collection, that.collection) && Objects.equals(
- dimensions, that.dimensions) && Objects.equals(mergeKey, that.mergeKey);
+ dimensions, that.dimensions) && Objects.equals(mergeKey, that.mergeKey) && Objects.equals(componentKey,
+ that.componentKey);
}
@Override
public int hashCode() {
- return Objects.hash(metric, collection, dimensions, mergeKey);
+ return Objects.hash(metric, collection, dimensions, mergeKey, componentKey);
}
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index a650550..0374892 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -18,8 +18,10 @@ package com.linkedin.thirdeye.detection.wrapper;
import com.google.common.base.Preconditions;
import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
+import com.linkedin.thirdeye.anomalydetection.context.AnomalyResult;
import com.linkedin.thirdeye.api.TimeGranularity;
import com.linkedin.thirdeye.api.TimeSpec;
+import com.linkedin.thirdeye.datalayer.dto.AnomalyFunctionDTO;
import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
@@ -61,7 +63,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
private static final String PROP_WINDOW_UNIT = "windowUnit";
private static final String PROP_FREQUENCY = "frequency";
private static final String PROP_DETECTOR = "detector";
- private static final String PROP_DETECTOR_COMPONENT_KEY = "detectorComponentKey";
+ private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
+ private static final String PROP_TIMEZONE = "timezone";
private static final Logger LOG = LoggerFactory.getLogger(
AnomalyDetectorWrapper.class);
@@ -76,11 +79,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
private final MetricConfigDTO metric;
private final MetricEntity metricEntity;
private final boolean isMovingWindowDetection;
- private DatasetConfigDTO dataset;
- private DateTimeZone dateTimeZone;
// need to specify run frequency for minute level detection. Used for moving monitoring window alignment, default to be 15 minutes.
private final TimeGranularity functionFrequency;
- private final String detectorReferenceKey;
+ private final String detectorName;
+ private final long windowSizeMillis;
+ private final DatasetConfigDTO dataset;
+ private final DateTimeZone dateTimeZone;
+
public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
super(provider, config, startTime, endTime);
@@ -90,19 +95,30 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
this.metric = provider.fetchMetrics(Collections.singleton(this.metricEntity.getId())).get(this.metricEntity.getId());
Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_DETECTOR));
- this.detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_DETECTOR));
- Preconditions.checkArgument(this.config.getComponents().containsKey(this.detectorReferenceKey));
- this.anomalyDetector = (AnomalyDetector) this.config.getComponents().get(this.detectorReferenceKey);
+ this.detectorName = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_DETECTOR));
+ Preconditions.checkArgument(this.config.getComponents().containsKey(this.detectorName));
+ this.anomalyDetector = (AnomalyDetector) this.config.getComponents().get(this.detectorName);
+ // emulate moving window or now
this.isMovingWindowDetection = MapUtils.getBooleanValue(config.getProperties(), PROP_MOVING_WINDOW_DETECTION, false);
// delays to wait for data becomes available
this.windowDelay = MapUtils.getIntValue(config.getProperties(), PROP_WINDOW_DELAY, 0);
+ // window delay unit
this.windowDelayUnit = TimeUnit.valueOf(MapUtils.getString(config.getProperties(), PROP_WINDOW_DELAY_UNIT, "DAYS"));
// detection window size
this.windowSize = MapUtils.getIntValue(config.getProperties(), PROP_WINDOW_SIZE, 1);
this.windowUnit = TimeUnit.valueOf(MapUtils.getString(config.getProperties(), PROP_WINDOW_UNIT, "DAYS"));
+ this.windowSizeMillis = TimeUnit.MILLISECONDS.convert(windowSize, windowUnit);
+ // run frequency, used to determine moving windows for minute-level detection
Map<String, Object> frequency = MapUtils.getMap(config.getProperties(), PROP_FREQUENCY, Collections.emptyMap());
this.functionFrequency = new TimeGranularity(MapUtils.getIntValue(frequency, "size", 15), TimeUnit.valueOf(MapUtils.getString(frequency, "unit", "MINUTES")));
+
+ MetricEntity me = MetricEntity.fromURN(this.metricUrn);
+ MetricConfigDTO metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
+ this.dataset = this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
+ .get(metricConfigDTO.getDataset());
+ // date time zone for moving windows. use dataset time zone as default
+ this.dateTimeZone = DateTimeZone.forID(MapUtils.getString(config.getProperties(), PROP_TIMEZONE, "America/Los_Angeles"));
}
@Override
@@ -112,6 +128,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
for (Interval window : monitoringWindows) {
List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
try {
+ LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time{}", config.getId(), metricUrn, window.getStart(), window.getEnd());
anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
} catch (Exception e) {
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
@@ -125,27 +142,25 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
anomaly.setMetric(this.metric.getName());
anomaly.setCollection(this.metric.getDataset());
anomaly.setDimensions(DetectionUtils.toFilterMap(this.metricEntity.getFilters()));
- anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_KEY, this.detectorReferenceKey);
+ anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME, this.detectorName);
}
return new DetectionPipelineResult(anomalies);
}
+ // get a list of the monitoring window, if no sliding window used, use start time and end time as window
List<Interval> getMonitoringWindows() {
if (this.isMovingWindowDetection) {
try{
List<Interval> monitoringWindows = new ArrayList<>();
- MetricEntity me = MetricEntity.fromURN(this.metricUrn);
- MetricConfigDTO metricConfigDTO =
- this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
- dataset = this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
- .get(metricConfigDTO.getDataset());
- dateTimeZone = DateTimeZone.forID(dataset.getTimezone());
List<Long> monitoringWindowEndTimes = getMonitoringWindowEndTimes();
for (long monitoringEndTime : monitoringWindowEndTimes) {
long endTime = monitoringEndTime - TimeUnit.MILLISECONDS.convert(windowDelay, windowDelayUnit);
- long startTime = endTime - TimeUnit.MILLISECONDS.convert(windowSize, windowUnit);
+ long startTime = endTime - this.windowSizeMillis;
monitoringWindows.add(new Interval(startTime, endTime, dateTimeZone));
}
+ for (Interval window : monitoringWindows){
+ LOG.info("running detections in windows {}", window);
+ }
return monitoringWindows;
} catch (Exception e) {
LOG.info("can't generate moving monitoring windows, calling with single detection window", e);
@@ -154,6 +169,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
return Collections.singletonList(new Interval(startTime, endTime));
}
+ // get the list of monitoring window end times
private List<Long> getMonitoringWindowEndTimes() {
List<Long> endTimes = new ArrayList<>();
@@ -169,6 +185,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
return endTimes;
}
+ /**
+ * round this time to earlier boundary, depending on granularity of dataset
+ * e.g. 12:15pm on HOURLY dataset should be treated as 12pm
+ * any dataset with granularity finer than HOUR, will be rounded as per function frequency (assumption is that this is in MINUTES)
+ * so 12.53 on 5 MINUTES dataset, with function frequency 15 MINUTES will be rounded to 12.45
+ * See also {@link DetectionJobSchedulerUtils#getBoundaryAlignedTimeForDataset(DateTime, TimeUnit)}
+ */
private long getBoundaryAlignedTimeForDataset(DateTime currentTime) {
TimeSpec timeSpec = ThirdEyeUtils.getTimeSpecFromDatasetConfig(dataset);
TimeUnit dataUnit = timeSpec.getDataGranularity().getUnit();
@@ -192,6 +215,12 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
return currentTime.getMillis();
}
+ /**
+ * get bucket size in millis, according to data granularity of dataset
+ * Bucket size are 1 HOUR for hourly, 1 DAY for daily
+ * For MINUTE level data, bucket size is calculated based on anomaly function frequency
+ * See also {@link DetectionJobSchedulerUtils#getBucketSizePeriodForDataset(DatasetConfigDTO, AnomalyFunctionDTO)} (DateTime, TimeUnit)}
+ */
public Period getBucketSizePeriodForDataset() {
Period bucketSizePeriod = null;
TimeSpec timeSpec = ThirdEyeUtils.getTimeSpecFromDatasetConfig(dataset);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index 64ffc47..df08645 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -17,10 +17,13 @@
package com.linkedin.thirdeye.detection.wrapper;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.linkedin.thirdeye.dataframe.DoubleSeries;
import com.linkedin.thirdeye.dataframe.Series;
import com.linkedin.thirdeye.dataframe.util.MetricSlice;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.DetectionUtils;
import com.linkedin.thirdeye.detection.DefaultInputDataFetcher;
@@ -29,11 +32,15 @@ import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
import com.linkedin.thirdeye.detection.components.RuleBaselineProvider;
import com.linkedin.thirdeye.detection.spec.RuleBaselineProviderSpec;
import com.linkedin.thirdeye.detection.spi.components.BaselineProvider;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregateType;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +48,7 @@ import org.slf4j.LoggerFactory;
/**
* Baseline filling merger. This merger's merging behavior is the same as MergeWrapper. But add the capability
- * of filling baseline & current values.
+ * of filling baseline & current values and inject detector, metric urn. Each detector has a separate baseline filling merge wrapper
*/
public class BaselineFillingMergeWrapper extends MergeWrapper {
private static final Logger LOG = LoggerFactory.getLogger(MergeWrapper.class);
@@ -49,12 +56,15 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
private static final String PROP_CURRENT_PROVIDER = "currentValueProvider";
private static final String PROP_METRIC_URN = "metricUrn";
- private static final String PROP_BASELINE_PROVIDER_COMPONENT_KEY = "baselineProviderComponentKey";
+ private static final String PROP_BASELINE_PROVIDER_COMPONENT_NAME = "baselineProviderComponentName";
+ private static final String PROP_DETECTOR = "detector";
+ private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
private BaselineProvider baselineValueProvider; // optionally configure a baseline value loader
private BaselineProvider currentValueProvider;
- private Series.DoubleFunction aggregationFunction;
private String baselineProviderComponentKey;
+ private String detectorComponentName;
+ private String metricUrn;
public BaselineFillingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
{
@@ -77,14 +87,24 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
InputDataFetcher dataFetcher = new DefaultInputDataFetcher(this.provider, this.config.getId());
this.currentValueProvider.init(spec, dataFetcher);
}
+
+ // inject detector to nested property if possible
+ String detectorComponentKey = MapUtils.getString(config.getProperties(), PROP_DETECTOR);
+ if (detectorComponentKey != null){
+ this.detectorComponentName = DetectionUtils.getComponentName(detectorComponentKey);
+ for (Map<String, Object> properties : this.nestedProperties){
+ properties.put(PROP_DETECTOR, detectorComponentKey);
+ }
+ }
+
+ // inject metricUrn to nested property if possible
String nestedUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
if (nestedUrn != null){
+ this.metricUrn = nestedUrn;
for (Map<String, Object> properties : this.nestedProperties){
properties.put(PROP_METRIC_URN, nestedUrn);
}
}
-
- this.aggregationFunction = BaselineAggregateType.valueOf(MapUtils.getString(config.getProperties(), "metricFunction", "MEAN")).getFunction();
}
@Override
@@ -92,6 +112,26 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
return this.fillCurrentAndBaselineValue(super.merge(anomalies));
}
+ @Override
+ protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+ AnomalySlice effectiveSlice = this.slice
+ .withStart(this.getStartTime(generated) - this.maxGap - 1)
+ .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+ Collection<MergedAnomalyResultDTO> retrieved = this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), this.config.getId()).get(effectiveSlice);
+
+ Collection<MergedAnomalyResultDTO> anomalies =
+ Collections2.filter(retrieved,
+ mergedAnomaly -> mergedAnomaly != null &&
+ !mergedAnomaly.isChild() &&
+ // merge if only the anomaly generated by the same detector
+ this.detectorComponentName.equals(mergedAnomaly.getProperties().getOrDefault(PROP_DETECTOR_COMPONENT_NAME, "")) &&
+ // merge if only the anomaly is in the same dimension
+ this.metricUrn.equals(mergedAnomaly.getMetricUrn())
+ );
+ return new ArrayList<>(anomalies);
+ }
+
/**
* Fill in current and baseline value for the anomalies
* @param mergedAnomalies anomalies
@@ -101,12 +141,25 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
for (MergedAnomalyResultDTO anomaly : mergedAnomalies) {
try {
String metricUrn = anomaly.getMetricUrn();
- final MetricSlice slice = MetricSlice.from(MetricEntity.fromURN(metricUrn).getId(), anomaly.getStartTime(), anomaly.getEndTime(),
- MetricEntity.fromURN(metricUrn).getFilters());
+ MetricEntity me = MetricEntity.fromURN(metricUrn);
+ long metricId = me.getId();
+ MetricConfigDTO metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(metricId)).get(metricId);
+ // aggregation function
+ Series.DoubleFunction aggregationFunction = DoubleSeries.MEAN;
+
+ try {
+ aggregationFunction =
+ BaselineAggregateType.valueOf(metricConfigDTO.getDefaultAggFunction().name()).getFunction();
+ } catch (Exception e) {
+ LOG.warn("cannot get aggregation function for metric, using average", metricId);
+ }
+
+ final MetricSlice slice = MetricSlice.from(metricId, anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
anomaly.setAvgCurrentVal(this.currentValueProvider.computePredictedAggregates(slice, aggregationFunction));
if (this.baselineValueProvider != null) {
anomaly.setAvgBaselineVal(this.baselineValueProvider.computePredictedAggregates(slice, aggregationFunction));
- anomaly.getProperties().put(PROP_BASELINE_PROVIDER_COMPONENT_KEY, this.baselineProviderComponentKey);
+ anomaly.getProperties().put(PROP_BASELINE_PROVIDER_COMPONENT_NAME, this.baselineProviderComponentKey);
+ anomaly.setWeight(calculateWeight(anomaly));
}
} catch (Exception e) {
// ignore
@@ -116,4 +169,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
return mergedAnomalies;
}
+ private double calculateWeight(MergedAnomalyResultDTO anomaly) {
+ return (anomaly.getAvgCurrentVal() - anomaly.getAvgBaselineVal()) / anomaly.getAvgBaselineVal();
+ }
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 1382d9e..9ffc5c2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -20,7 +20,6 @@ import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
-import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -37,7 +36,7 @@ import java.util.Map;
*/
public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
public ChildKeepingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
- throws Exception {
+ {
super(provider, config, startTime, endTime);
}
@@ -105,6 +104,7 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
newAnomaly.setFeedback(anomaly.getFeedback());
newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId());
newAnomaly.setScore(anomaly.getScore());
+ newAnomaly.setWeight(anomaly.getWeight());
return newAnomaly;
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 45ecf73..45cad5b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -138,6 +138,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
private static final String PROP_WINDOW_UNIT = "windowUnit";
private static final String PROP_FREQUENCY = "frequency";
private static final String PROP_MERGER = "merger";
+ private static final String PROP_TIMEZONE = "timezone";
private static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
private static final Map<String, String> DETECTOR_TO_BASELINE = ImmutableMap.of("ALGORITHM", "ALGORITHM_BASELINE");
@@ -227,7 +228,6 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
Map<String, Object> nestedProperties = new HashMap<>();
nestedProperties.put(PROP_CLASS_NAME, AnomalyDetectorWrapper.class.getName());
String detectorKey = makeComponentKey(ruleName, detectorType, id);
- nestedProperties.put(PROP_DETECTOR, detectorKey);
fillInWindowSizeAndUnit(nestedProperties, yamlConfig, detectorType);
@@ -242,6 +242,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
}
String baselineProviderKey = makeComponentKey(ruleName, baselineProviderType, id);
properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
+ properties.put(PROP_DETECTOR, detectorKey);
buildComponentSpec(yamlConfig, baselineProviderType, baselineProviderKey);
properties.putAll(this.mergerProperties);
return properties;
@@ -283,6 +284,9 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
if (yamlConfig.containsKey(PROP_WINDOW_DELAY_UNIT)) {
properties.put(PROP_WINDOW_DELAY_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY_UNIT));
}
+ if (yamlConfig.containsKey(PROP_TIMEZONE)){
+ properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig, PROP_TIMEZONE));
+ }
}
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
index ce1c03c..0d6891e 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
@@ -40,11 +40,11 @@ public class AnomalyDetectorWrapperTest {
private static final String PROP_METRIC_URN = "metricUrn";
private static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
private static final String PROP_DETECTOR = "detector";
+ private static final String PROP_TIMEZONE = "timezone";
private MockDataProvider provider;
private Map<String, Object> properties;
private DetectionConfigDTO config;
- private Map<String, Object> stageSpecs;
@BeforeMethod
public void setUp() {
@@ -80,6 +80,7 @@ public class AnomalyDetectorWrapperTest {
@Test
public void testMovingMonitoringWindow() {
this.properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+ this.properties.put(PROP_TIMEZONE, TimeSpec.DEFAULT_TIMEZONE);
AnomalyDetectorWrapper detectionPipeline =
new AnomalyDetectorWrapper(this.provider, this.config, 1540147725000L, 1540493325000L);
List<Interval> monitoringWindows = detectionPipeline.getMonitoringWindows();
@@ -89,4 +90,17 @@ public class AnomalyDetectorWrapperTest {
new Interval(1540252800000L, 1540339200000L, timeZone), new Interval(1540339200000L, 1540425600000L, timeZone)));
}
+ @Test
+ public void testMovingMonitoringWindowBoundary() {
+ this.properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+ this.properties.put(PROP_TIMEZONE, TimeSpec.DEFAULT_TIMEZONE);
+ AnomalyDetectorWrapper detectionPipeline =
+ new AnomalyDetectorWrapper(this.provider, this.config, 1540080000000L, 1540425600000L);
+ List<Interval> monitoringWindows = detectionPipeline.getMonitoringWindows();
+ DateTimeZone timeZone = DateTimeZone.forID(TimeSpec.DEFAULT_TIMEZONE);
+ Assert.assertEquals(monitoringWindows,
+ Arrays.asList(new Interval(1540080000000L, 1540166400000L, timeZone), new Interval(1540166400000L, 1540252800000L, timeZone),
+ new Interval(1540252800000L, 1540339200000L, timeZone), new Interval(1540339200000L, 1540425600000L, timeZone)));
+ }
+
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 998c8eb..481c6f0 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -17,10 +17,12 @@
package com.linkedin.thirdeye.detection.wrapper;
import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.constant.MetricAggFunction;
import com.linkedin.thirdeye.dataframe.DataFrame;
import com.linkedin.thirdeye.dataframe.util.MetricSlice;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.DefaultInputDataFetcher;
import com.linkedin.thirdeye.detection.DetectionPipelineResult;
@@ -95,16 +97,22 @@ public class BaselineFillingMergeWrapperTest {
@Test
public void testMergerCurrentAndBaselineLoading() throws Exception {
MergedAnomalyResultDTO anomaly = makeAnomaly(3000, 3600);
+ anomaly.setProperties(ImmutableMap.of("detectorComponentName", "testDetector"));
anomaly.setMetricUrn("thirdeye:metric:1");
Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
aggregates.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(-1, 100).build());
+ MetricConfigDTO metric = new MetricConfigDTO();
+ metric.setId(1L);
+ metric.setDefaultAggFunction(MetricAggFunction.SUM);
DataProvider
- provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates);
+ provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates)
+ .setMetrics(Collections.singletonList(metric));
this.config.getProperties().put(PROP_MAX_GAP, 100);
this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
+ this.config.getProperties().put("detector", "$testDetector");
BaselineProvider baselineProvider = new MockBaselineProvider();
MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
index 050043c..65fb1b5 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
@@ -17,9 +17,9 @@
"className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
"maxGap" : 0,
"nested" : [ {
- "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
- "detector" : "$rule1:THRESHOLD:0"
+ "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
} ],
+ "detector" : "$rule1:THRESHOLD:0",
"maxDuration" : 100
} ]
} ]
@@ -31,9 +31,9 @@
"className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
"maxGap" : 0,
"nested" : [ {
- "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
- "detector" : "$rule2:THRESHOLD:0"
+ "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
} ],
+ "detector" : "$rule2:THRESHOLD:0",
"maxDuration" : 100
} ]
} ],
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
index 2d875a4..bd43106 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
@@ -9,9 +9,9 @@
"baselineValueProvider" : "$rule1:RULE_BASELINE:0",
"className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
"nested" : [ {
- "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
- "detector" : "$rule1:THRESHOLD:0"
- } ]
+ "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
+ } ],
+ "detector" : "$rule1:THRESHOLD:0"
} ],
"minContribution" : 0.05,
"dimensions" : [ "D1", "D2" ]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org