You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2018/11/30 01:16:15 UTC
[incubator-pinot] branch master updated: [TE] detection - filter
out child anomaly in anomaly fetching (#3570)
This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8b2efec [TE] detection - filter out child anomaly in anomaly fetching (#3570)
8b2efec is described below
commit 8b2efecfb9e4f626f7e5ec9a878d0264ed7482f2
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Thu Nov 29 17:16:10 2018 -0800
[TE] detection - filter out child anomaly in anomaly fetching (#3570)
- Filter out child anomaly in data provider anomaly fetching
- Fill in timezone/bucket for anomaly filter
- The final child keeping merger does not fetch anomalies from the database.
- Replay remove anomaly flag.
---
.../thirdeye/detection/DefaultDataProvider.java | 9 +++---
.../detection/DetectionMigrationResource.java | 8 +++--
.../thirdeye/detection/DetectionResource.java | 35 ++++++++++++----------
.../thirdeye/detection/algorithm/MergeWrapper.java | 31 +++++++++++--------
.../wrapper/BaselineFillingMergeWrapper.java | 2 +-
.../wrapper/ChildKeepingMergeWrapper.java | 9 +++++-
.../wrapper/ChildKeepingMergeWrapperTest.java | 13 +++-----
7 files changed, 62 insertions(+), 45 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
index f600c44..c5f6ec3 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
@@ -17,6 +17,7 @@
package com.linkedin.thirdeye.detection;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Collections2;
import com.google.common.collect.Multimap;
import com.linkedin.thirdeye.dataframe.DataFrame;
import com.linkedin.thirdeye.dataframe.util.MetricSlice;
@@ -148,7 +149,7 @@ public class DefaultDataProvider implements DataProvider {
if (predicates.isEmpty()) throw new IllegalArgumentException("Must provide at least one of start, end, or " + functionIdKey);
- List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(AND(predicates));
+ Collection<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(AND(predicates));
anomalies.removeIf(anomaly -> !slice.match(anomaly));
if (isLegacy) {
@@ -160,12 +161,12 @@ public class DefaultDataProvider implements DataProvider {
(configId >= 0) && (anomaly.getDetectionConfigId() == null || anomaly.getDetectionConfigId() != configId)
);
}
+ // filter all child anomalies. those are kept in the parent anomaly children set.
+ anomalies = Collections2.filter(anomalies, mergedAnomaly -> mergedAnomaly != null && !mergedAnomaly.isChild());
- LOG.info("Fetched {} anomalies between (startTime = {}, endTime = {}) with confid Id = {}", anomalies.size(),
- slice.getStart(), slice.getEnd(), configId);
+ LOG.info("Fetched {} anomalies between (startTime = {}, endTime = {}) with confid Id = {}", anomalies.size(), slice.getStart(), slice.getEnd(), configId);
output.putAll(slice, anomalies);
}
-
return output;
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index 3b15748..d4035aa 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -16,6 +16,7 @@
package com.linkedin.thirdeye.detection;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.linkedin.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
import com.linkedin.thirdeye.datalayer.bao.AnomalyFunctionManager;
@@ -94,8 +95,9 @@ public class DetectionMigrationResource {
@GET
public String migrateToYaml(@QueryParam("id") long anomalyFunctionId) throws Exception {
AnomalyFunctionDTO anomalyFunctionDTO = this.anomalyFunctionDAO.findById(anomalyFunctionId);
+ Preconditions.checkArgument(anomalyFunctionDTO.getIsActive(), "try to migrate inactive anomaly function");
Map<String, Object> yamlConfigs = new LinkedHashMap<>();
- yamlConfigs.put("detectionName", anomalyFunctionDTO.getFunctionName());
+ yamlConfigs.put("detectionName", "new_pipeline_" + anomalyFunctionDTO.getFunctionName());
yamlConfigs.put("metric", anomalyFunctionDTO.getMetric());
yamlConfigs.put("dataset", anomalyFunctionDTO.getCollection());
yamlConfigs.put("pipelineType", "Composite");
@@ -192,8 +194,8 @@ public class DetectionMigrationResource {
Map<String, Object> params = new HashMap<>();
filterYamlParams.put("configuration", params);
params.putAll(functionDTO.getAlertFilter());
- params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
- params.put("variables.timeZone", getTimezone(functionDTO));
+ params.put("bucketPeriod", getBucketPeriod(functionDTO));
+ params.put("timeZone", getTimezone(functionDTO));
return filterYamlParams;
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index e6dcad4..a51a3a6 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -41,6 +41,8 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@@ -49,6 +51,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.collections.MapUtils;
+import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,7 +158,7 @@ public class DetectionResource {
DetectionPipelineResult result = pipeline.run();
if (diagnostics == null || !diagnostics) {
- result.setDiagnostics(Collections.<String, Object>emptyMap());
+ result.setDiagnostics(Collections.emptyMap());
}
return Response.ok(result).build();
@@ -163,27 +166,28 @@ public class DetectionResource {
@POST
@Path("/replay/{id}")
- public Response detectionReplay(
- @PathParam("id") long configId,
- @QueryParam("start") long start,
- @QueryParam("end") long end) throws Exception {
+ public Response detectionReplay(@PathParam("id") long configId, @QueryParam("start") long start,
+ @QueryParam("end") long end,
+ @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly) throws Exception {
DetectionConfigDTO config = this.configDAO.findById(configId);
if (config == null) {
throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
}
- // clear existing anomalies
AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
- Collection<MergedAnomalyResultDTO> existing = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
-
- List<Long> existingIds = new ArrayList<>();
- for (MergedAnomalyResultDTO anomaly : existing) {
- existingIds.add(anomaly.getId());
+ if (deleteExistingAnomaly) {
+ // clear existing anomalies
+ Collection<MergedAnomalyResultDTO> existing =
+ this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+ List<Long> existingIds = new ArrayList<>();
+ for (MergedAnomalyResultDTO anomaly : existing) {
+ existingIds.add(anomaly.getId());
+ }
+ this.anomalyDAO.deleteByIds(existingIds);
}
- this.anomalyDAO.deleteByIds(existingIds);
-
// execute replay
DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
DetectionPipelineResult result = pipeline.run();
@@ -200,6 +204,7 @@ public class DetectionResource {
this.anomalyDAO.save(anomaly);
}
- return Response.ok(result).build();
- }
+ LOG.info("replay detection pipeline {} generated {} anomalies.", config.getId(), result.getAnomalies().size());
+ return Response.ok(result.getAnomalies()).build();
}
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
index 144bf5f..43f9c27 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
@@ -30,9 +30,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import org.apache.commons.collections.MapUtils;
@@ -44,6 +46,7 @@ public class MergeWrapper extends DetectionPipeline {
private static final String PROP_NESTED = "nested";
private static final String PROP_CLASS_NAME = "className";
private static final String PROP_MERGE_KEY = "mergeKey";
+ private static final String PROP_DETECTOR_COMPONENT_KEY = "detectorComponentKey";
protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new Comparator<MergedAnomalyResultDTO>() {
@Override
@@ -115,22 +118,23 @@ public class MergeWrapper extends DetectionPipeline {
i++;
}
- // retrieve anomalies
- AnomalySlice effectiveSlice = this.slice
- .withStart(this.getStartTime(generated) - this.maxGap - 1)
- .withEnd(this.getEndTime(generated) + this.maxGap + 1);
-
- List<MergedAnomalyResultDTO> retrieved = new ArrayList<>();
- retrieved.addAll(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), this.config.getId()).get(effectiveSlice));
-
// merge
- List<MergedAnomalyResultDTO> all = new ArrayList<>();
- all.addAll(retrieved);
+ Set<MergedAnomalyResultDTO> all = new HashSet<>();
+ all.addAll(retrieveAnomaliesFromDatabase(generated));
all.addAll(generated);
return new DetectionPipelineResult(this.merge(all)).setDiagnostics(diagnostics);
}
+ protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+ AnomalySlice effectiveSlice = this.slice
+ .withStart(this.getStartTime(generated) - this.maxGap - 1)
+ .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+ return new ArrayList<>(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), this.config.getId()).get(effectiveSlice));
+ }
+
+ // logic to do time-based merging.
protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
Collections.sort(input, COMPARATOR);
@@ -198,16 +202,19 @@ public class MergeWrapper extends DetectionPipeline {
final String collection;
final DimensionMap dimensions;
final String mergeKey;
+ final String componentKey;
- public AnomalyKey(String metric, String collection, DimensionMap dimensions, String mergeKey) {
+ public AnomalyKey(String metric, String collection, DimensionMap dimensions, String mergeKey, String componentKey) {
this.metric = metric;
this.collection = collection;
this.dimensions = dimensions;
this.mergeKey = mergeKey;
+ this.componentKey = componentKey;
}
public static AnomalyKey from(MergedAnomalyResultDTO anomaly) {
- return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY));
+ return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY),
+ anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_KEY));
}
@Override
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index d743971..64ffc47 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -110,7 +110,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
}
} catch (Exception e) {
// ignore
- LOG.warn("cannot get metric slice for anomaly {}", anomaly, e);
+ LOG.warn("cannot get current or baseline value for anomaly {}. ", anomaly, e);
}
}
return mergedAnomalies;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 77536f9..1382d9e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -20,6 +20,7 @@ import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -41,6 +42,12 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
}
@Override
+ // does not fetch any anomalies from database
+ protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+ return Collections.emptyList();
+ }
+
+ @Override
protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
Collections.sort(input, MergeWrapper.COMPARATOR);
@@ -54,7 +61,7 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
}
MergeWrapper.AnomalyKey
- key = new MergeWrapper.AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "");
+ key = new MergeWrapper.AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "", "");
MergedAnomalyResultDTO parent = parents.get(key);
if (parent == null || anomaly.getStartTime() - parent.getEndTime() > this.maxGap) {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
index 4162940..4a75289 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
@@ -18,14 +18,12 @@ package com.linkedin.thirdeye.detection.wrapper;
import com.google.common.collect.ImmutableSet;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
-import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.DetectionPipelineResult;
import com.linkedin.thirdeye.detection.MockDataProvider;
import com.linkedin.thirdeye.detection.MockPipeline;
import com.linkedin.thirdeye.detection.MockPipelineLoader;
import com.linkedin.thirdeye.detection.MockPipelineOutput;
-import com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -86,26 +84,23 @@ public class ChildKeepingMergeWrapperTest {
this.config.setName(PROP_NAME_VALUE);
this.config.setProperties(this.properties);
- List<MergedAnomalyResultDTO> existing = new ArrayList<>();
- existing.add(makeAnomaly(0, 1000));
- existing.add(makeAnomaly(1500, 2000));
-
this.outputs = new ArrayList<>();
this.outputs.add(new MockPipelineOutput(Arrays.asList(
makeAnomaly(1100, 1200),
- makeAnomaly(2200, 2300)
+ makeAnomaly(2200, 2300),
+ makeAnomaly(0, 1000)
), 2900));
this.outputs.add(new MockPipelineOutput(Arrays.asList(
makeAnomaly(1150, 1250),
- makeAnomaly(2400, 2800)
+ makeAnomaly(2400, 2800),
+ makeAnomaly(1500, 2000)
), 3000));
MockPipelineLoader mockLoader = new MockPipelineLoader(this.runs, this.outputs);
this.provider = new MockDataProvider()
- .setAnomalies(existing)
.setLoader(mockLoader);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org