You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2019/05/10 21:25:23 UTC

[incubator-pinot] branch update_merge_logic created (now 4896956)

This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a change to branch update_merge_logic
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 4896956  [TE] Update anomaly merge logic 1. Don't output existing anomalies that not get merged. 2. Merge new anomaly's properties into existing anomaly. 3. Do not split existing anomalies.

This branch includes the following new commits:

     new 4896956  [TE] Update anomaly merge logic 1. Don't output existing anomalies that not get merged. 2. Merge new anomaly's properties into existing anomaly. 3. Do not split existing anomalies.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: [TE] Update anomaly merge logic 1. Don't output existing anomalies that not get merged. 2. Merge new anomaly's properties into existing anomaly. 3. Do not split existing anomalies.

Posted by xh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch update_merge_logic
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 48969567d08c706a66c7bcbea5365bf32300cd74
Author: Xiaohui Sun <xh...@xhsun-mn3.linkedin.biz>
AuthorDate: Fri May 10 14:25:00 2019 -0700

    [TE] Update anomaly merge logic
    1. Don't output existing anomalies that not get merged.
    2. Merge new anomaly's properties into existing anomaly.
    3. Do not split existing anomalies.
---
 .../thirdeye/detection/algorithm/MergeWrapper.java |  73 +++++++++---
 .../thirdeye/detection/DetectionTestUtils.java     |   6 +
 .../detection/algorithm/MergeWrapperTest.java      | 125 ++++++++++++---------
 3 files changed, 137 insertions(+), 67 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index 5acf958..fc64501 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.collections.MapUtils;
 import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
@@ -146,11 +147,18 @@ public class MergeWrapper extends DetectionPipeline {
     return new ArrayList<>(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), this.config.getId()).get(effectiveSlice));
   }
 
-  // logic to do time-based merging.
+  private boolean isExistingAnomaly(MergedAnomalyResultDTO anomaly) {
+    return anomaly.getId() != null;
+  }
+
+  // Merge new anomalies into existing anomalies. Return the anomalies that need to update or add.
+  // If it is existing anomaly and not updated then it is not returned.
   protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
     List<MergedAnomalyResultDTO> input = new ArrayList<>(enforceMaxDuration(anomalies));
     Collections.sort(input, COMPARATOR);
 
+    // stores all the existing anomalies that need to modified
+    Set<Long> modifiedExistingIds = new HashSet<>();
     List<MergedAnomalyResultDTO> output = new ArrayList<>();
 
     Map<AnomalyKey, MergedAnomalyResultDTO> parents = new HashMap<>();
@@ -163,16 +171,38 @@ public class MergeWrapper extends DetectionPipeline {
       MergedAnomalyResultDTO parent = parents.get(key);
 
       if (parent == null || anomaly.getStartTime() - parent.getEndTime() > this.maxGap) {
-        // no parent, too far away
+        // no parent, too far away to merge
+        //      parent |-------------|
+        //                                  anomaly |---------------|
+        //
         parents.put(key, anomaly);
-        output.add(anomaly);
-
+        if (!isExistingAnomaly(anomaly)) {
+          output.add(anomaly);
+        }
       } else if (anomaly.getEndTime() <= parent.getEndTime() || anomaly.getEndTime() - parent.getStartTime() <= this.maxDuration) {
-        // fully merge into existing
+        // fully cover
+        //      parent |-------------------|
+        //              anomaly |-------------|
+        // or mergeable
+        //      parent |-------------------|
+        //                      anomaly |-------------|
+        // or small gap
+        //      parent |-------------------|
+        //                                 anomaly |-------------|
+        //
         parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime()));
-
+        // merge the anomaly's properties into parent
+        Map<String, String> properties = parent.getProperties();
+        properties.putAll(anomaly.getProperties());
+        parent.setProperties(properties);
+        if (isExistingAnomaly(parent)) {
+          modifiedExistingIds.add(parent.getId());
+        }
       } else if (parent.getEndTime() >= anomaly.getStartTime()) {
-        // partially merge, truncate new
+        // mergeable but exceeds maxDuration, then truncate
+        //      parent |---------------------|
+        //                        anomaly |------------------------|
+        //
         long truncationTimestamp = Math.max(parent.getEndTime(), parent.getStartTime() + this.maxDuration);
 
         parent.setEndTime(truncationTimestamp);
@@ -180,29 +210,42 @@ public class MergeWrapper extends DetectionPipeline {
         anomaly.setEndTime(Math.max(truncationTimestamp, anomaly.getEndTime()));
 
         parents.put(key, anomaly);
-        output.add(anomaly);
-
+        if (!isExistingAnomaly(anomaly)) {
+          output.add(anomaly);
+        }
+        if (isExistingAnomaly(parent)) {
+          modifiedExistingIds.add(parent.getId());
+        }
       } else {
         // default to new parent if merge not possible
         parents.put(key, anomaly);
-        output.add(anomaly);
-
+        if (!isExistingAnomaly(anomaly)) {
+          output.add(anomaly);
+        }
       }
     }
 
-    return output;
+    // add modified existing anomalies into output
+    output.addAll(input.stream().filter(x -> x.getId()!= null && modifiedExistingIds.contains(x.getId())).collect(Collectors.toList()));
+
+    return new ArrayList<>(output);
   }
 
   /*
     Make sure that the anomalies generated from detector is shorter than maxDuration. Otherwise, split the anomaly
+    Do not split anomaly if it is existing anomaly.
    */
   private Collection<MergedAnomalyResultDTO> enforceMaxDuration(Collection<MergedAnomalyResultDTO> anomalies) {
     Set<MergedAnomalyResultDTO> result = new HashSet<>();
     for (MergedAnomalyResultDTO anomaly : anomalies) {
-      if (anomaly.getEndTime() - anomaly.getStartTime() > this.maxDuration) {
-        result.addAll(splitAnomaly(anomaly, this.maxDuration));
-      } else {
+      if (isExistingAnomaly(anomaly)) {
         result.add(anomaly);
+      } else {
+        if (anomaly.getEndTime() - anomaly.getStartTime() > this.maxDuration) {
+          result.addAll(splitAnomaly(anomaly, this.maxDuration));
+        } else {
+          result.add(anomaly);
+        }
       }
     }
     return result;
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
index 2378dad..6ea9c96 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
@@ -49,6 +49,12 @@ public class DetectionTestUtils {
     return DetectionTestUtils.makeAnomaly(configId, null, start, end, metric, dataset, dimensions);
   }
 
+  public static MergedAnomalyResultDTO makeAnomaly(long start, long end, long id) {
+    MergedAnomalyResultDTO anomalyResultDTO = makeAnomaly(start, end);
+    anomalyResultDTO.setId(id);
+    return anomalyResultDTO;
+  }
+
   public static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
     return DetectionTestUtils.makeAnomaly(PROP_ID_VALUE, start, end, null, null, Collections.<String, String>emptyMap());
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
index 1f679a0..29f08e4 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
@@ -59,6 +59,18 @@ public class MergeWrapperTest {
   private static final String PROP_MAX_GAP = "maxGap";
   private static final String PROP_MAX_DURATION = "maxDuration";
 
+  /*
+    Here are the anomalies in the test.
+
+    Existing anomalies:
+    0                1000             1500       2000
+    |-----------------|                |----------|
+    New  anomalies:
+                       1100  1200                     2200  2300
+                        |-----|                         |-----|
+                           1150 1250                            2400          2800
+                            |-----|                               |------------|
+  */
   @BeforeMethod
   public void beforeMethod() {
     this.runs = new ArrayList<>();
@@ -87,26 +99,19 @@ public class MergeWrapperTest {
     this.config.setProperties(this.properties);
 
     List<MergedAnomalyResultDTO> existing = new ArrayList<>();
-    existing.add(makeAnomaly(0, 1000));
-    existing.add(makeAnomaly(1500, 2000));
+    // For existing anomalies add ids.
+    existing.add(makeAnomaly(0, 1000, 0));
+    existing.add(makeAnomaly(1500, 2000, 1));
 
     this.outputs = new ArrayList<>();
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(1100, 1200),
-        makeAnomaly(2200, 2300)
-    ), 2900));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1100, 1200), makeAnomaly(2200, 2300)), 2900));
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(1150, 1250),
-        makeAnomaly(2400, 2800)
-    ), 3000));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1150, 1250), makeAnomaly(2400, 2800)), 3000));
 
     this.mockLoader = new MockPipelineLoader(this.runs, this.outputs);
 
-    this.provider = new MockDataProvider()
-        .setAnomalies(existing)
-        .setLoader(this.mockLoader);
+    this.provider = new MockDataProvider().setAnomalies(existing).setLoader(this.mockLoader);
   }
 
   @Test
@@ -115,7 +120,10 @@ public class MergeWrapperTest {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000);
     DetectionPipelineResult output = this.wrapper.run();
 
-    Assert.assertEquals(output.getAnomalies().size(), 5);
+    Assert.assertEquals(output.getAnomalies().size(), 3);
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
     Assert.assertEquals(output.getLastTimestamp(), 3000);
   }
 
@@ -126,10 +134,12 @@ public class MergeWrapperTest {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000);
     DetectionPipelineResult output = this.wrapper.run();
 
-    Assert.assertEquals(output.getAnomalies().size(), 3);
+    // anomaly [1500, 2000] is not modified
+    Assert.assertEquals(output.getAnomalies().size(), 2);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000)));
+    // anomalies [1100, 1200] and [1150,1250] are merged into [0, 1000]
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    // anomalies [2200, 2300] and [2400, 2800] are merged
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2800)));
   }
 
@@ -143,8 +153,8 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 3);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
   }
 
@@ -153,10 +163,7 @@ public class MergeWrapperTest {
     this.config.getProperties().put(PROP_MAX_GAP, 200);
     this.config.getProperties().put(PROP_MAX_DURATION, 1250);
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(2800, 3700),
-        makeAnomaly(3700, 3800)
-    ), 3700));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(2800, 3700), makeAnomaly(3700, 3800)), 3700));
 
     Map<String, Object> nestedProperties = new HashMap<>();
     nestedProperties.put(PROP_CLASS_NAME, "none");
@@ -169,8 +176,8 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800)));
   }
@@ -180,10 +187,7 @@ public class MergeWrapperTest {
     this.config.getProperties().put(PROP_MAX_GAP, 200);
     this.config.getProperties().put(PROP_MAX_DURATION, 1250);
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(2800, 3800),
-        makeAnomaly(3500, 3600)
-    ), 3700));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(2800, 3800), makeAnomaly(3500, 3600)), 3700));
 
     Map<String, Object> nestedProperties = new HashMap<>();
     nestedProperties.put(PROP_CLASS_NAME, "none");
@@ -196,8 +200,8 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800)));
   }
@@ -206,10 +210,7 @@ public class MergeWrapperTest {
   public void testMergerMaxDurationEnforce() throws Exception {
     this.config.getProperties().put(PROP_MAX_DURATION, 500);
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(2800, 3800),
-        makeAnomaly(3500, 3600)
-    ), 3700));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(2800, 3800), makeAnomaly(3500, 3600)), 3700));
 
     Map<String, Object> nestedProperties = new HashMap<>();
     nestedProperties.put(PROP_CLASS_NAME, "none");
@@ -220,19 +221,15 @@ public class MergeWrapperTest {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000);
     DetectionPipelineResult output = this.wrapper.run();
 
-    Assert.assertEquals(output.getAnomalies().size(), 8);
+    Assert.assertEquals(output.getAnomalies().size(), 5);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 500)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(500, 1000)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2900)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2900, 3400)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3400, 3800)));
   }
 
-
   @Test
   public void testMergerExecution() throws Exception {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000);
@@ -263,15 +260,13 @@ public class MergeWrapperTest {
     this.config.getProperties().put(PROP_MAX_GAP, 200);
     this.config.getProperties().put(PROP_MAX_DURATION, 1250);
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(1150, 1250, Collections.singletonMap("key", "value")),
-        makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value"))
-    ), 3000));
+    this.outputs.add(new MockPipelineOutput(
+        Arrays.asList(makeAnomaly(1150, 1250, Collections.singletonMap("key", "value")),
+            makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value"))), 3000));
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(
-        makeAnomaly(1250, 1300, Collections.singletonMap("key", "value")),
-        makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue"))
-    ), 3000));
+    this.outputs.add(new MockPipelineOutput(
+        Arrays.asList(makeAnomaly(1250, 1300, Collections.singletonMap("key", "value")),
+            makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue"))), 3000));
 
     Map<String, Object> nestedPropertiesThree = new HashMap<>();
     nestedPropertiesThree.put(PROP_CLASS_NAME, "none");
@@ -289,12 +284,38 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 6);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, 0)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, 1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1150, 1300, Collections.singletonMap("key", "value"))));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value"))));
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue"))));
+    Assert.assertTrue(
+        output.getAnomalies().contains(makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value"))));
+    Assert.assertTrue(
+        output.getAnomalies().contains(makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue"))));
   }
 
+  @Test
+  public void testMergeProperties() throws Exception {
+
+    MergedAnomalyResultDTO anomaly = makeAnomaly(1100, 1250);
+    String propertyKey = "trend_day1";
+    String propertyValue = "{trend_info}";
+    anomaly.setProperties(Collections.singletonMap(propertyKey, propertyValue));
+
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(anomaly), 3700));
+
+    Map<String, Object> nestedProperties = new HashMap<>();
+    nestedProperties.put(PROP_CLASS_NAME, "none");
+    nestedProperties.put(PROP_METRIC_URN, "thirdeye:metric:1");
+
+    this.nestedProperties.add(nestedProperties);
+
+    this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000);
+    DetectionPipelineResult output = this.wrapper.run();
+
+    Assert.assertEquals(output.getAnomalies().size(), 1);
+    Assert.assertEquals(output.getLastTimestamp(), 3700);
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 2800, 0)));
+    Assert.assertTrue(output.getAnomalies().get(0).getProperties().get(propertyKey).equals(propertyValue));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org