You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2020/11/30 18:32:03 UTC

[incubator-pinot] branch master updated: [TE] fix changing createdTime of anomalies (#6269)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dee4d1  [TE] fix changing createdTime of anomalies (#6269)
0dee4d1 is described below

commit 0dee4d1bf2a2fb828ada437e65d86e3f186f9020
Author: Vincent Chen <ji...@linkedin.com>
AuthorDate: Mon Nov 30 10:31:45 2020 -0800

    [TE] fix changing createdTime of anomalies (#6269)
    
    This PR is to fix a bug that the createdTime of anomalies is updated when new anomalies with earlier startTime is generated, which causes that the same anomalies get send out twice due to updated createdTime of anomalies. The fix is to merge new anomalies into existing anomalies if possible, regardless of its startTime.
---
 .../dashboard/resources/SummaryResourceTest.java   |  8 +++++
 .../detection/algorithm/MergeWrapperTest.java      | 19 ++++++++--
 .../pinot/resources/PinotDataSourceResource.java   | 10 ++----
 .../thirdeye/detection/algorithm/MergeWrapper.java | 42 ++++++++++++----------
 4 files changed, 50 insertions(+), 29 deletions(-)

diff --git a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/dashboard/resources/SummaryResourceTest.java b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/dashboard/resources/SummaryResourceTest.java
index 8b5dfe7..4b7e46d 100644
--- a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/dashboard/resources/SummaryResourceTest.java
+++ b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/dashboard/resources/SummaryResourceTest.java
@@ -1,11 +1,19 @@
 package org.apache.pinot.thirdeye.dashboard.resources;
 
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
 public class SummaryResourceTest {
+  private DAOTestBase testDAOProvider;
+
+  @BeforeMethod
+  public void setUp() {
+    testDAOProvider = DAOTestBase.getInstance();
+  }
   @Test
   public void testIsSimpleRatioMetric() {
     // False: null metric config
diff --git a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
index c5e98f5..db26454 100644
--- a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
+++ b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
@@ -49,6 +49,7 @@ public class MergeWrapperTest {
   private List<MockPipeline> runs;
   private List<MockPipelineOutput> outputs;
   private MockPipelineLoader mockLoader;
+  private List<MergedAnomalyResultDTO> existing;
 
   private static final Long PROP_ID_VALUE = 1000L;
   private static final String PROP_NAME_VALUE = "myName";
@@ -99,10 +100,10 @@ public class MergeWrapperTest {
     this.config.setName(PROP_NAME_VALUE);
     this.config.setProperties(this.properties);
 
-    List<MergedAnomalyResultDTO> existing = new ArrayList<>();
+    this.existing = new ArrayList<>();
     // For existing anomalies add ids.
-    existing.add(setAnomalyId(makeAnomaly(100, 1000), 0));
-    existing.add(setAnomalyId(makeAnomaly(1500, 2000), 1));
+    this.existing.add(setAnomalyId(makeAnomaly(100, 1000), 0));
+    this.existing.add(setAnomalyId(makeAnomaly(1500, 2000), 1));
 
     this.outputs = new ArrayList<>();
 
@@ -141,6 +142,12 @@ public class MergeWrapperTest {
     Assert.assertEquals(output.getLastTimestamp(), 3000);
     // anomalies [100, 1000] and [1150,1250] are merged into [50, 1200]
     Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0)));
+    // ensure that the createdTime of anomalies is not changed
+    Assert.assertEquals(
+        output.getAnomalies().stream()
+            .filter(x -> x.equals(setAnomalyId(makeAnomaly(50, 1250), 0))).findFirst().get().getCreatedTime(),
+        existing.stream()
+            .filter(x -> x.equals(setAnomalyId(makeAnomaly(50, 1250), 0))).findFirst().get().getCreatedTime());
     // anomalies [2200, 2300] and [2400, 2800] are merged
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2800)));
   }
@@ -156,6 +163,12 @@ public class MergeWrapperTest {
     Assert.assertEquals(output.getAnomalies().size(), 3);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
     Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0)));
+    // ensure that the createdTime of anomalies is not changed
+    Assert.assertEquals(
+        output.getAnomalies().stream()
+            .filter(x -> x.equals(setAnomalyId(makeAnomaly(50, 1250), 0))).findFirst().get().getCreatedTime(),
+        existing.stream()
+            .filter(x -> x.equals(setAnomalyId(makeAnomaly(50, 1250), 0))).findFirst().get().getCreatedTime());
     Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 2300), 1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java
index bbcabd1..b0d2fed 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java
@@ -30,8 +30,6 @@ import org.apache.pinot.thirdeye.datasource.pinot.PinotThirdEyeDataSource;
 import org.apache.pinot.thirdeye.datasource.pinot.resultset.ThirdEyeResultSet;
 import org.apache.pinot.thirdeye.datasource.pinot.resultset.ThirdEyeResultSetGroup;
 import org.apache.pinot.thirdeye.datasource.pinot.resultset.ThirdEyeResultSetSerializer;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
 import java.util.concurrent.ExecutionException;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -67,14 +65,10 @@ public class PinotDataSourceResource {
    */
   @GET
   @Path("/query")
-  public String executePQL(@QueryParam("pql") String pql, @QueryParam("tableName") String tableName)
-      throws UnsupportedEncodingException {
+  public String executePQL(@QueryParam("pql") String pql, @QueryParam("tableName") String tableName) {
     initPinotDataSource();
-
     String resultString;
-    String decodedPql = URLDecoder.decode(pql, URL_ENCODING);
-    String decodedTableName = URLDecoder.decode(tableName, URL_ENCODING);
-    PinotQuery pinotQuery = new PinotQuery(decodedPql, decodedTableName);
+    PinotQuery pinotQuery = new PinotQuery(pql, tableName);
     try {
       ThirdEyeResultSetGroup thirdEyeResultSetGroup = pinotDataSource.executePQL(pinotQuery);
       resultString = OBJECT_MAPPER.writeValueAsString(thirdEyeResultSetGroup);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index ee67927..c254b2d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -66,12 +66,12 @@ public class MergeWrapper extends DetectionPipeline {
   protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new Comparator<MergedAnomalyResultDTO>() {
     @Override
     public int compare(MergedAnomalyResultDTO o1, MergedAnomalyResultDTO o2) {
-      // earlier for start time
+      // first order anomalies from earliest startTime to latest
       int res = Long.compare(o1.getStartTime(), o2.getStartTime());
       if (res != 0) return res;
 
-      // later for end time
-      res = Long.compare(o2.getEndTime(), o1.getEndTime());
+      // order anomalies from earliest createdTime to latest, if startTime are the same
+      res = Long.compare(o1.getCreatedTime(), o2.getCreatedTime());
       if (res != 0) return res;
 
       // pre-existing
@@ -190,25 +190,31 @@ public class MergeWrapper extends DetectionPipeline {
         //      parent |-------------------|
         //                                 anomaly |-------------|
         //
-        parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime()));
-
-        // merge the anomaly's properties into parent
-        ThirdEyeUtils.mergeAnomalyProperties(parent.getProperties(), anomaly.getProperties());
+        // merge new anomaly to existing anomaly
         if (isExistingAnomaly(parent)) {
+          // parent (existing) |---------------------|
+          // anomaly (new)          |-------------------|
+          parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime()));
+          ThirdEyeUtils.mergeAnomalyProperties(parent.getProperties(), anomaly.getProperties());
+          mergeChildren(parent, anomaly);
           modifiedExistingAnomalies.add(parent);
+        } else if (isExistingAnomaly(anomaly)) {
+          // parent (new)       |---------------------|
+          // anomaly (existing)      |-------------------|
+          anomaly.setStartTime(Math.min(parent.getStartTime(), anomaly.getStartTime()));
+          anomaly.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime()));
+          ThirdEyeUtils.mergeAnomalyProperties(anomaly.getProperties(), parent.getProperties());
+          mergeChildren(anomaly, parent);
+          modifiedExistingAnomalies.add(anomaly);
+          retainedNewAnomalies.remove(parent);
+          parents.put(key, anomaly);
         } else {
-          // merge existing anomaly to new anomaly, set id to new anomaly
-          //  parent (new) |-------------------|
-          //         anomaly (existing) |-------------|
-          if (isExistingAnomaly(anomaly)) {
-            parent.setId(anomaly.getId());
-            anomaly.setId(null);
-          }
+          // parent (new)       |---------------------|
+          // anomaly (new)             |-------------------|
+          parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime()));
+          ThirdEyeUtils.mergeAnomalyProperties(parent.getProperties(), anomaly.getProperties());
+          mergeChildren(parent, anomaly);
         }
-
-        // merge the anomaly's children into the parent
-        mergeChildren(parent, anomaly);
-
       } else if (parent.getEndTime() >= anomaly.getStartTime()) {
         // mergeable but exceeds maxDuration, then truncate
         //      parent |---------------------|


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org