You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2018/11/30 01:16:15 UTC

[incubator-pinot] branch master updated: [TE] detection - filter out child anomaly in anomaly fetching (#3570)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b2efec  [TE] detection - filter out child anomaly in anomaly fetching (#3570)
8b2efec is described below

commit 8b2efecfb9e4f626f7e5ec9a878d0264ed7482f2
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Thu Nov 29 17:16:10 2018 -0800

    [TE] detection - filter out child anomaly in anomaly fetching (#3570)
    
    - Filter out child anomaly in data provider anomaly fetching
    - Fill in timezone/bucket for anomaly filter
    - The final child keeping merger does not fetch anomalies from the database.
    - Replay remove anomaly flag.
---
 .../thirdeye/detection/DefaultDataProvider.java    |  9 +++---
 .../detection/DetectionMigrationResource.java      |  8 +++--
 .../thirdeye/detection/DetectionResource.java      | 35 ++++++++++++----------
 .../thirdeye/detection/algorithm/MergeWrapper.java | 31 +++++++++++--------
 .../wrapper/BaselineFillingMergeWrapper.java       |  2 +-
 .../wrapper/ChildKeepingMergeWrapper.java          |  9 +++++-
 .../wrapper/ChildKeepingMergeWrapperTest.java      | 13 +++-----
 7 files changed, 62 insertions(+), 45 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
index f600c44..c5f6ec3 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
@@ -17,6 +17,7 @@
 package com.linkedin.thirdeye.detection;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.Multimap;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
@@ -148,7 +149,7 @@ public class DefaultDataProvider implements DataProvider {
 
       if (predicates.isEmpty()) throw new IllegalArgumentException("Must provide at least one of start, end, or " + functionIdKey);
 
-      List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(AND(predicates));
+      Collection<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(AND(predicates));
       anomalies.removeIf(anomaly -> !slice.match(anomaly));
 
       if (isLegacy) {
@@ -160,12 +161,12 @@ public class DefaultDataProvider implements DataProvider {
             (configId >= 0) && (anomaly.getDetectionConfigId() == null || anomaly.getDetectionConfigId() != configId)
         );
       }
+      // filter all child anomalies. those are kept in the parent anomaly children set.
+      anomalies = Collections2.filter(anomalies, mergedAnomaly -> mergedAnomaly != null && !mergedAnomaly.isChild());
 
-      LOG.info("Fetched {} anomalies between (startTime = {}, endTime = {}) with confid Id = {}", anomalies.size(),
-          slice.getStart(), slice.getEnd(), configId);
+      LOG.info("Fetched {} anomalies between (startTime = {}, endTime = {}) with confid Id = {}", anomalies.size(), slice.getStart(), slice.getEnd(), configId);
       output.putAll(slice, anomalies);
     }
-
     return output;
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index 3b15748..d4035aa 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -16,6 +16,7 @@
 
 package com.linkedin.thirdeye.detection;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.linkedin.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
 import com.linkedin.thirdeye.datalayer.bao.AnomalyFunctionManager;
@@ -94,8 +95,9 @@ public class DetectionMigrationResource {
   @GET
   public String migrateToYaml(@QueryParam("id") long anomalyFunctionId) throws Exception {
     AnomalyFunctionDTO anomalyFunctionDTO = this.anomalyFunctionDAO.findById(anomalyFunctionId);
+    Preconditions.checkArgument(anomalyFunctionDTO.getIsActive(), "try to migrate inactive anomaly function");
     Map<String, Object> yamlConfigs = new LinkedHashMap<>();
-    yamlConfigs.put("detectionName", anomalyFunctionDTO.getFunctionName());
+    yamlConfigs.put("detectionName", "new_pipeline_" + anomalyFunctionDTO.getFunctionName());
     yamlConfigs.put("metric", anomalyFunctionDTO.getMetric());
     yamlConfigs.put("dataset", anomalyFunctionDTO.getCollection());
     yamlConfigs.put("pipelineType", "Composite");
@@ -192,8 +194,8 @@ public class DetectionMigrationResource {
     Map<String, Object> params = new HashMap<>();
     filterYamlParams.put("configuration", params);
     params.putAll(functionDTO.getAlertFilter());
-    params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
-    params.put("variables.timeZone", getTimezone(functionDTO));
+    params.put("bucketPeriod", getBucketPeriod(functionDTO));
+    params.put("timeZone", getTimezone(functionDTO));
     return filterYamlParams;
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index e6dcad4..a51a3a6 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -41,6 +41,8 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -49,6 +51,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.collections.MapUtils;
+import org.joda.time.Interval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -155,7 +158,7 @@ public class DetectionResource {
     DetectionPipelineResult result = pipeline.run();
 
     if (diagnostics == null || !diagnostics) {
-      result.setDiagnostics(Collections.<String, Object>emptyMap());
+      result.setDiagnostics(Collections.emptyMap());
     }
 
     return Response.ok(result).build();
@@ -163,27 +166,28 @@ public class DetectionResource {
 
   @POST
   @Path("/replay/{id}")
-  public Response detectionReplay(
-      @PathParam("id") long configId,
-      @QueryParam("start") long start,
-      @QueryParam("end") long end) throws Exception {
+  public Response detectionReplay(@PathParam("id") long configId, @QueryParam("start") long start,
+      @QueryParam("end") long end,
+      @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly) throws Exception {
 
     DetectionConfigDTO config = this.configDAO.findById(configId);
     if (config == null) {
       throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
     }
 
-    // clear existing anomalies
     AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
-    Collection<MergedAnomalyResultDTO> existing = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
-
-    List<Long> existingIds = new ArrayList<>();
-    for (MergedAnomalyResultDTO anomaly : existing) {
-      existingIds.add(anomaly.getId());
+    if (deleteExistingAnomaly) {
+      // clear existing anomalies
+      Collection<MergedAnomalyResultDTO> existing =
+          this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+      List<Long> existingIds = new ArrayList<>();
+      for (MergedAnomalyResultDTO anomaly : existing) {
+        existingIds.add(anomaly.getId());
+      }
+      this.anomalyDAO.deleteByIds(existingIds);
     }
 
-    this.anomalyDAO.deleteByIds(existingIds);
-
     // execute replay
     DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
     DetectionPipelineResult result = pipeline.run();
@@ -200,6 +204,7 @@ public class DetectionResource {
       this.anomalyDAO.save(anomaly);
     }
 
-    return Response.ok(result).build();
-  }
+    LOG.info("replay detection pipeline {} generated {} anomalies.", config.getId(), result.getAnomalies().size());
+    return Response.ok(result.getAnomalies()).build();
   }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
index 144bf5f..43f9c27 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
@@ -30,9 +30,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import org.apache.commons.collections.MapUtils;
 
 
@@ -44,6 +46,7 @@ public class MergeWrapper extends DetectionPipeline {
   private static final String PROP_NESTED = "nested";
   private static final String PROP_CLASS_NAME = "className";
   private static final String PROP_MERGE_KEY = "mergeKey";
+  private static final String PROP_DETECTOR_COMPONENT_KEY = "detectorComponentKey";
 
   protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new Comparator<MergedAnomalyResultDTO>() {
     @Override
@@ -115,22 +118,23 @@ public class MergeWrapper extends DetectionPipeline {
       i++;
     }
 
-    // retrieve anomalies
-    AnomalySlice effectiveSlice = this.slice
-        .withStart(this.getStartTime(generated) - this.maxGap - 1)
-        .withEnd(this.getEndTime(generated) + this.maxGap + 1);
-
-    List<MergedAnomalyResultDTO> retrieved = new ArrayList<>();
-    retrieved.addAll(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), this.config.getId()).get(effectiveSlice));
-
     // merge
-    List<MergedAnomalyResultDTO> all = new ArrayList<>();
-    all.addAll(retrieved);
+    Set<MergedAnomalyResultDTO> all = new HashSet<>();
+    all.addAll(retrieveAnomaliesFromDatabase(generated));
     all.addAll(generated);
 
     return new DetectionPipelineResult(this.merge(all)).setDiagnostics(diagnostics);
   }
 
+  protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+    AnomalySlice effectiveSlice = this.slice
+        .withStart(this.getStartTime(generated) - this.maxGap - 1)
+        .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+    return new ArrayList<>(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), this.config.getId()).get(effectiveSlice));
+  }
+
+  // logic to do time-based merging.
   protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
     List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
     Collections.sort(input, COMPARATOR);
@@ -198,16 +202,19 @@ public class MergeWrapper extends DetectionPipeline {
     final String collection;
     final DimensionMap dimensions;
     final String mergeKey;
+    final String componentKey;
 
-    public AnomalyKey(String metric, String collection, DimensionMap dimensions, String mergeKey) {
+    public AnomalyKey(String metric, String collection, DimensionMap dimensions, String mergeKey, String componentKey) {
       this.metric = metric;
       this.collection = collection;
       this.dimensions = dimensions;
       this.mergeKey = mergeKey;
+      this.componentKey = componentKey;
     }
 
     public static AnomalyKey from(MergedAnomalyResultDTO anomaly) {
-      return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY));
+      return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY),
+          anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_KEY));
     }
 
     @Override
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index d743971..64ffc47 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -110,7 +110,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
         }
       } catch (Exception e) {
         // ignore
-        LOG.warn("cannot get metric slice for anomaly {}", anomaly, e);
+        LOG.warn("cannot get current or baseline value for anomaly {}. ", anomaly, e);
       }
     }
     return mergedAnomalies;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 77536f9..1382d9e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -20,6 +20,7 @@ import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -41,6 +42,12 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
   }
 
   @Override
+  // does not fetch any anomalies from database
+  protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+    return Collections.emptyList();
+  }
+
+  @Override
   protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
     List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
     Collections.sort(input, MergeWrapper.COMPARATOR);
@@ -54,7 +61,7 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
       }
 
       MergeWrapper.AnomalyKey
-          key = new MergeWrapper.AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "");
+          key = new MergeWrapper.AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "", "");
       MergedAnomalyResultDTO parent = parents.get(key);
 
       if (parent == null || anomaly.getStartTime() - parent.getEndTime() > this.maxGap) {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
index 4162940..4a75289 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
@@ -18,14 +18,12 @@ package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.collect.ImmutableSet;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
-import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
 import com.linkedin.thirdeye.detection.MockDataProvider;
 import com.linkedin.thirdeye.detection.MockPipeline;
 import com.linkedin.thirdeye.detection.MockPipelineLoader;
 import com.linkedin.thirdeye.detection.MockPipelineOutput;
-import com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -86,26 +84,23 @@ public class ChildKeepingMergeWrapperTest {
     this.config.setName(PROP_NAME_VALUE);
     this.config.setProperties(this.properties);
 
-    List<MergedAnomalyResultDTO> existing = new ArrayList<>();
-    existing.add(makeAnomaly(0, 1000));
-    existing.add(makeAnomaly(1500, 2000));
-
     this.outputs = new ArrayList<>();
 
     this.outputs.add(new MockPipelineOutput(Arrays.asList(
         makeAnomaly(1100, 1200),
-        makeAnomaly(2200, 2300)
+        makeAnomaly(2200, 2300),
+        makeAnomaly(0, 1000)
     ), 2900));
 
     this.outputs.add(new MockPipelineOutput(Arrays.asList(
         makeAnomaly(1150, 1250),
-        makeAnomaly(2400, 2800)
+        makeAnomaly(2400, 2800),
+        makeAnomaly(1500, 2000)
     ), 3000));
 
     MockPipelineLoader mockLoader = new MockPipelineLoader(this.runs, this.outputs);
 
     this.provider = new MockDataProvider()
-        .setAnomalies(existing)
         .setLoader(mockLoader);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org