You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/06/28 18:51:44 UTC

[incubator-pinot] branch master updated: [TE] Support Anomaly Hierarchy for Composite alerts (#4371)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 33756ef  [TE] Support Anomaly Hierarchy for Composite alerts (#4371)
33756ef is described below

commit 33756ef933ed6c8f186c69189552045c4e3c3c45
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Fri Jun 28 11:51:40 2019 -0700

    [TE] Support Anomaly Hierarchy for Composite alerts (#4371)
    
    
    Extend anomaly hierarchy to multiple levels
---
 thirdeye/thirdeye-pinot/config/h2db.mv.db          | Bin 2461696 -> 4947968 bytes
 .../anomaly/merge/AnomalyTimeBasedSummarizer.java  |   2 +
 .../bao/jdbc/MergedAnomalyResultManagerImpl.java   | 104 ++++++++++++++-------
 .../bao/TestGroupedAnomalyResultsManager.java      |   6 +-
 .../bao/TestMergedAnomalyResultManager.java        |  46 ++++++++-
 .../pinot/thirdeye/detection/DataProviderTest.java |   2 +
 6 files changed, 125 insertions(+), 35 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/config/h2db.mv.db b/thirdeye/thirdeye-pinot/config/h2db.mv.db
index 7d567a0..a7e598d 100644
Binary files a/thirdeye/thirdeye-pinot/config/h2db.mv.db and b/thirdeye/thirdeye-pinot/config/h2db.mv.db differ
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/merge/AnomalyTimeBasedSummarizer.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/merge/AnomalyTimeBasedSummarizer.java
index 95ff6dd..d6bf3ba 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/merge/AnomalyTimeBasedSummarizer.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/merge/AnomalyTimeBasedSummarizer.java
@@ -19,6 +19,7 @@
 
 package org.apache.pinot.thirdeye.anomaly.merge;
 
+import java.util.HashSet;
 import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyResult;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -92,6 +93,7 @@ public abstract class AnomalyTimeBasedSummarizer {
       LOG.info("Current anomaly start =[{}], end = [{}].", currentResult.getStartTime(), currentResult.getEndTime());
       if (mergedAnomaly == null || currentResult.getEndTime() < mergedAnomaly.getStartTime()) {
         mergedAnomaly = new MergedAnomalyResultDTO(currentResult);
+        mergedAnomaly.setChildIds(new HashSet<>());
       } else {
         // compare current with merged and decide whether to merge the current result or create a new one
         MergedAnomalyResultDTO currAnomaly = new MergedAnomalyResultDTO(currentResult);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
index 26c6415..c66721f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.pinot.thirdeye.datalayer.bao.jdbc;
 
+import com.google.common.base.Preconditions;
 import com.google.inject.Singleton;
 import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
@@ -98,24 +99,82 @@ public class MergedAnomalyResultManagerImpl extends AbstractManagerImpl<MergedAn
       update(mergedAnomalyResultDTO);
       return mergedAnomalyResultDTO.getId();
     }
+    return saveAnomaly(mergedAnomalyResultDTO, new HashSet<>());
+  }
+
+  public int update(MergedAnomalyResultDTO mergedAnomalyResultDTO) {
+    if (mergedAnomalyResultDTO.getId() == null) {
+      Long id = save(mergedAnomalyResultDTO);
+      if (id > 0) {
+        return 1;
+      } else {
+        return 0;
+      }
+    } else {
+      return updateAnomaly(mergedAnomalyResultDTO, new HashSet<>());
+    }
+  }
+
+  private Long saveAnomaly(MergedAnomalyResultDTO mergedAnomalyResultDTO, Set<MergedAnomalyResultDTO> visitedAnomalies) {
+    Preconditions.checkNotNull(mergedAnomalyResultDTO);
+    Preconditions.checkNotNull(visitedAnomalies);
+
+    visitedAnomalies.add(mergedAnomalyResultDTO);
+
+    if (mergedAnomalyResultDTO.getId() != null) {
+      updateAnomaly(mergedAnomalyResultDTO, visitedAnomalies);
+      return mergedAnomalyResultDTO.getId();
+    }
+
     MergedAnomalyResultBean mergeAnomalyBean = convertMergeAnomalyDTO2Bean(mergedAnomalyResultDTO);
+    Set<Long> childAnomalyIds = saveChildAnomalies(mergedAnomalyResultDTO, visitedAnomalies);
+    mergeAnomalyBean.setChildIds(childAnomalyIds);
+
     Long id = genericPojoDao.put(mergeAnomalyBean);
     mergedAnomalyResultDTO.setId(id);
     return id;
   }
 
-  public int update(MergedAnomalyResultDTO mergedAnomalyResultDTO) {
+  private int updateAnomaly(MergedAnomalyResultDTO mergedAnomalyResultDTO, Set<MergedAnomalyResultDTO> visitedAnomalies) {
+    visitedAnomalies.add(mergedAnomalyResultDTO);
+
     if (mergedAnomalyResultDTO.getId() == null) {
-      Long id = save(mergedAnomalyResultDTO);
+      Long id = saveAnomaly(mergedAnomalyResultDTO, visitedAnomalies);
       if (id > 0) {
         return 1;
       } else {
         return 0;
       }
-    } else {
-      MergedAnomalyResultBean mergeAnomalyBean = convertMergeAnomalyDTO2Bean(mergedAnomalyResultDTO);
-      return genericPojoDao.update(mergeAnomalyBean);
     }
+
+    MergedAnomalyResultBean mergeAnomalyBean = convertMergeAnomalyDTO2Bean(mergedAnomalyResultDTO);
+    Set<Long> childAnomalyIds = saveChildAnomalies(mergedAnomalyResultDTO, visitedAnomalies);
+    mergeAnomalyBean.setChildIds(childAnomalyIds);
+
+    return genericPojoDao.update(mergeAnomalyBean);
+  }
+
+  private Set<Long> saveChildAnomalies(MergedAnomalyResultDTO parentAnomaly,
+      Set<MergedAnomalyResultDTO> visitedAnomalies) {
+    Set<Long> childIds = new HashSet<>();
+    Set<MergedAnomalyResultDTO> childAnomalies = parentAnomaly.getChildren();
+    if (childAnomalies == null || childAnomalies.isEmpty()) {
+      // No child anomalies to save
+      return childIds;
+    }
+
+    for (MergedAnomalyResultDTO child : childAnomalies) {
+      if (child.getId() == null) {
+        // Prevent cycles
+        if (visitedAnomalies.contains(child)) {
+          throw new IllegalArgumentException("Loop detected! Child anomaly referencing ancestor");
+        }
+      }
+      child.setChild(true);
+      childIds.add(saveAnomaly(child, visitedAnomalies));
+    }
+
+    return childIds;
   }
 
   @Override
@@ -403,31 +462,6 @@ public class MergedAnomalyResultManagerImpl extends AbstractManagerImpl<MergedAn
       bean.setFunctionId(entity.getFunction().getId());
     }
 
-    if (entity.getChildren() != null && !entity.getChildren().isEmpty()) {
-      Set<Long> childIds = new HashSet<>();
-      for (MergedAnomalyResultDTO child : entity.getChildren()) {
-        if (child.getId() == null) {
-          // only allow single level to prevent cycles
-          if (child == entity){
-            throw new IllegalArgumentException("Cannot contain itself as child anomaly");
-          }
-          if (child.getChildren() != null && !child.getChildren().isEmpty()) {
-            throw new IllegalArgumentException("Multi-level anomaly nesting not supported");
-          }
-        }
-        child.setChild(true);
-        save(child);
-        childIds.add(child.getId());
-      }
-
-      // only allow single level to prevent cycles
-      if (bean.isChild()) {
-        throw new IllegalArgumentException("Multi-level anomaly nesting not supported");
-      }
-
-      bean.setChildIds(childIds);
-    }
-
     return bean;
   }
 
@@ -448,6 +482,12 @@ public class MergedAnomalyResultManagerImpl extends AbstractManagerImpl<MergedAn
       mergedAnomalyResultDTO.setFeedback(anomalyFeedbackDTO);
     }
 
+    mergedAnomalyResultDTO.setChildren(getChildAnomalies(mergedAnomalyResultBean));
+
+    return mergedAnomalyResultDTO;
+  }
+
+  private Set<MergedAnomalyResultDTO> getChildAnomalies(MergedAnomalyResultBean mergedAnomalyResultBean) {
     Set<MergedAnomalyResultDTO> children = new HashSet<>();
     if (mergedAnomalyResultBean.getChildIds() != null) {
       for (Long id : mergedAnomalyResultBean.getChildIds()) {
@@ -460,9 +500,7 @@ public class MergedAnomalyResultManagerImpl extends AbstractManagerImpl<MergedAn
         children.add(child);
       }
     }
-    mergedAnomalyResultDTO.setChildren(children);
-
-    return mergedAnomalyResultDTO;
+    return children;
   }
 
   @Override
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/bao/TestGroupedAnomalyResultsManager.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/bao/TestGroupedAnomalyResultsManager.java
index a447992..bf4142f 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/bao/TestGroupedAnomalyResultsManager.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/bao/TestGroupedAnomalyResultsManager.java
@@ -16,6 +16,7 @@
 
 package org.apache.pinot.thirdeye.datalayer.bao;
 
+import java.util.HashSet;
 import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
 import org.apache.pinot.thirdeye.datalayer.dto.GroupedAnomalyResultsDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
@@ -101,9 +102,11 @@ public class TestGroupedAnomalyResultsManager {
   public Object[][] groupedAnomalies() {
     MergedAnomalyResultDTO mergedAnomalyResultDTO1 = new MergedAnomalyResultDTO();
     mergedAnomalyResultDTO1.setEndTime(10);
+    mergedAnomalyResultDTO1.setChildIds(new HashSet<>());
 
     MergedAnomalyResultDTO mergedAnomalyResultDTO2 = new MergedAnomalyResultDTO();
     mergedAnomalyResultDTO2.setEndTime(15);
+    mergedAnomalyResultDTO2.setChildIds(new HashSet<>());
 
     Long mergedAnomalyResultDTO1Id = mergedAnomalyResultDAO.save(mergedAnomalyResultDTO1);
     mergedAnomalyResultDTO1.setId(mergedAnomalyResultDTO1Id);
@@ -114,12 +117,13 @@ public class TestGroupedAnomalyResultsManager {
     mergedAnomalyResultsSet1.add(mergedAnomalyResultDTO2);
     mergedAnomalyResultsSet1.add(mergedAnomalyResultDTO1);
 
-
     MergedAnomalyResultDTO mergedAnomalyResultDTO3 = new MergedAnomalyResultDTO();
     mergedAnomalyResultDTO3.setEndTime(20);
+    mergedAnomalyResultDTO3.setChildIds(new HashSet<>());
 
     MergedAnomalyResultDTO mergedAnomalyResultDTO4 = new MergedAnomalyResultDTO();
     mergedAnomalyResultDTO4.setEndTime(25);
+    mergedAnomalyResultDTO4.setChildIds(new HashSet<>());
 
     Long mergedAnomalyResultDTO3Id = mergedAnomalyResultDAO.save(mergedAnomalyResultDTO3);
     mergedAnomalyResultDTO3.setId(mergedAnomalyResultDTO3Id);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/bao/TestMergedAnomalyResultManager.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/bao/TestMergedAnomalyResultManager.java
index d710e2b..fe85c0b 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/bao/TestMergedAnomalyResultManager.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/bao/TestMergedAnomalyResultManager.java
@@ -161,7 +161,7 @@ public class TestMergedAnomalyResultManager{
   }
 
   @Test
-  public void testLoadChildren() {
+  public void testSaveAndLoadHierarchicalAnomalies() {
     MergedAnomalyResultDTO parent = new MergedAnomalyResultDTO();
     parent.setStartTime(1000);
     parent.setEndTime(2000);
@@ -174,6 +174,11 @@ public class TestMergedAnomalyResultManager{
     child2.setStartTime(1500);
     child2.setEndTime(2000);
 
+    MergedAnomalyResultDTO child3 = new MergedAnomalyResultDTO();
+    child3.setStartTime(1600);
+    child3.setEndTime(1800);
+
+    child2.setChildren(new HashSet<>(Arrays.asList(child3)));
     parent.setChildren(new HashSet<>(Arrays.asList(child1, child2)));
 
     long parentId = this.mergedAnomalyResultDAO.save(parent);
@@ -203,5 +208,44 @@ public class TestMergedAnomalyResultManager{
     Assert.assertTrue(readChildren.get(1).isChild());
     Assert.assertEquals(readChildren.get(1).getStartTime(), 1500);
     Assert.assertEquals(readChildren.get(1).getEndTime(), 2000);
+    Assert.assertEquals(readChildren.get(1).getChildren().size(), 1);
+    Assert.assertEquals(readChildren.get(1).getChildren().iterator().next().getStartTime(), 1600);
+    Assert.assertEquals(readChildren.get(1).getChildren().iterator().next().getEndTime(), 1800);
+  }
+
+  @Test
+  public void testUpdateToAnomalyHierarchy() {
+    MergedAnomalyResultDTO parent = new MergedAnomalyResultDTO();
+    parent.setStartTime(1000);
+    parent.setEndTime(2000);
+
+    MergedAnomalyResultDTO child1 = new MergedAnomalyResultDTO();
+    child1.setStartTime(1000);
+    child1.setEndTime(1500);
+
+    MergedAnomalyResultDTO child2 = new MergedAnomalyResultDTO();
+    child2.setStartTime(1500);
+    child2.setEndTime(2000);
+
+    MergedAnomalyResultDTO child3 = new MergedAnomalyResultDTO();
+    child3.setStartTime(1600);
+    child3.setEndTime(1800);
+
+    child1.setChildren(new HashSet<>(Arrays.asList(child2)));
+    parent.setChildren(new HashSet<>(Arrays.asList(child1)));
+
+    this.mergedAnomalyResultDAO.save(parent);
+
+    child2.setChildren(new HashSet<>(Arrays.asList(child3)));
+
+    this.mergedAnomalyResultDAO.save(parent);
+
+    MergedAnomalyResultDTO read = this.mergedAnomalyResultDAO.findById(parent.getId());
+    Assert.assertFalse(read.getChildren().isEmpty());
+    Assert.assertEquals(read.getChildren().iterator().next().getStartTime(), 1000);
+    Assert.assertFalse(read.getChildren().iterator().next().getChildren().isEmpty());
+    Assert.assertEquals(read.getChildren().iterator().next().getChildren().iterator().next().getStartTime(), 1500);
+    Assert.assertFalse(read.getChildren().iterator().next().getChildren().iterator().next().getChildren().isEmpty());
+    Assert.assertEquals(read.getChildren().iterator().next().getChildren().iterator().next().getChildren().iterator().next().getStartTime(), 1600);
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
index d3a65d7..6580d1f 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
@@ -275,6 +276,7 @@ public class DataProviderTest {
     anomaly.setStartTime(start);
     anomaly.setEndTime(end);
     anomaly.setId(id);
+    anomaly.setChildIds(new HashSet<>());
 
     DimensionMap filters = new DimensionMap();
     for (String fs : filterStrings) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org