You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/02/20 00:18:37 UTC

[incubator-pinot] branch master updated: [TE] Fix bugs in user dashboard endpoint (#3853)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0675aee  [TE] Fix bugs in user dashboard endpoint (#3853)
0675aee is described below

commit 0675aee34f00d7684579c3b18a3e4dd6bff7acc1
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Tue Feb 19 16:18:32 2019 -0800

    [TE] Fix bugs in user dashboard endpoint (#3853)
---
 .../api/user/dashboard/UserDashboardResource.java  | 221 ++++++++++++---------
 .../detection/DetectionMigrationResource.java      |   2 +
 .../resource/v2/UserDashboardResourceTest.java     |  13 +-
 3 files changed, 134 insertions(+), 102 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
index 1e889b0..bbf2e9e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
@@ -42,7 +42,6 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.thirdeye.api.Constants;
 import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
@@ -65,11 +64,14 @@ import org.apache.pinot.thirdeye.datalayer.util.Predicate;
 import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
 import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
 import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
 import org.apache.pinot.thirdeye.detection.CurrentAndBaselineLoader;
 import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.thirdeye.detection.yaml.YamlDetectionAlertConfigTranslator.*;
+
 
 /**
  * Endpoints for user-customized dashboards (currently alerts only)
@@ -167,8 +169,6 @@ public class UserDashboardResource {
       @QueryParam("fetchTrueAnomaly") @DefaultValue("false") boolean fetchTrueAnomaly,
       @ApiParam(value = "max number of results")
       @QueryParam("limit") Integer limit) throws Exception {
-    Map<String, String> responseMessage = new HashMap<>();
-    List<Predicate> predicates = new ArrayList<>();
     LOG.info("[USER DASHBOARD] Fetching anomalies with filters. Start: " + start + " end: " + end + " metric: "
         + metric + " dataset: " + dataset + " owner: " + owner + " application: " + application + " group: " + group
         + " fetchTrueAnomaly: " + fetchTrueAnomaly + " limit: " + limit);
@@ -178,98 +178,22 @@ public class UserDashboardResource {
       LOG.warn("No upper limit specified while fetching anomalies. Defaulting to " + ANOMALIES_LIMIT_DEFAULT);
       limit = ANOMALIES_LIMIT_DEFAULT;
     }
-
-    // Filter by metric and dataset
-    if (metric != null) {
-      predicates.add(Predicate.EQ("metric", metric));
-    }
-    if (dataset != null) {
-      predicates.add(Predicate.EQ("collection", dataset));
-    }
-
-    // anomaly window start and end
     Preconditions.checkNotNull(start, "Please specify the start time of the anomaly retrieval window");
-    predicates.add(Predicate.GE("endTime", start));
-    if (end != null) {
-      predicates.add(Predicate.LT("startTime", end));
-    }
 
     // TODO support index select on user-reported anomalies
 //    predicates.add(Predicate.OR(
 //        Predicate.EQ("notified", true),
 //        Predicate.EQ("anomalyResultSource", AnomalyResultSource.USER_LABELED_ANOMALY)));
 
-    // application (indirect)
-    Set<Long> applicationFunctionIds = new HashSet<>();
-    if (StringUtils.isNotBlank(application)) {
-      List<AnomalyFunctionDTO> functions = this.functionDAO.findAllByApplication(application);
-      for (AnomalyFunctionDTO function : functions) {
-        if (function.getIsActive()) {
-          applicationFunctionIds.add(function.getId());
-        }
-      }
-    }
-
-    // TODO: deprecate after migration
-    // Support for partially migrated alerts.
-    List<DetectionAlertConfigDTO> notifications = detectionAlertDAO.findByPredicate(Predicate.EQ("application", application));
-    for (DetectionAlertConfigDTO notification : notifications) {
-      applicationFunctionIds.addAll(notification.getVectorClocks().keySet());
-    }
-
-    // group (indirect)
-    Set<Long> groupFunctionIds = new HashSet<>();
-    if (StringUtils.isNotBlank(group)) {
-      AlertConfigDTO alert = this.alertDAO.findWhereNameEquals(group);
-      if (alert != null) {
-        groupFunctionIds.addAll(alert.getEmailConfig().getFunctionIds());
-      }
-    }
-
-    // owner (indirect)
-    Set<Long> ownerFunctionIds = new HashSet<>();
-    if (StringUtils.isNotBlank(owner)) {
-      // TODO: replace database scan with targeted select
-      List<AnomalyFunctionDTO> functions = this.functionDAO.findAll();
-      for (AnomalyFunctionDTO function : functions) {
-        if (Objects.equals(function.getCreatedBy(), owner)) {
-          ownerFunctionIds.add(function.getId());
-        }
-      }
-    }
-
-    // anomaly function ids
-    List<Predicate> oldPredicates = new ArrayList<>(predicates);
-    if (StringUtils.isNotBlank(application) || StringUtils.isNotBlank(group) || StringUtils.isNotBlank(owner)) {
-      Set<Long> functionIds = new HashSet<>();
-      functionIds.addAll(applicationFunctionIds);
-      functionIds.addAll(groupFunctionIds);
-      functionIds.addAll(ownerFunctionIds);
-
-      oldPredicates.add(Predicate.IN("functionId", functionIds.toArray()));
-    }
-
-    // fetch legacy anomalies via predicatesprincipal
-    List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(Predicate.AND(oldPredicates.toArray(new Predicate[oldPredicates.size()])));
-    // filter (un-notified && non-user-reported) anomalies
-    // TODO remove once index select on user-reported anomalies available
-    Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator();
-    while (itAnomaly.hasNext()) {
-      MergedAnomalyResultDTO anomaly = itAnomaly.next();
-      if (!anomaly.isNotified() &&
-          !AnomalyResultSource.USER_LABELED_ANOMALY.equals(anomaly.getAnomalyResultSource())) {
-        itAnomaly.remove();
-      }
-    }
-
-    // fetch new detection framework anomalies by group
-    anomalies.addAll(fetchFrameworkAnomaliesByGroup(start, end, group));
-
-    // fetch new detection framework anomalies by application
-    anomalies.addAll(fetchFrameworkAnomaliesByApplication(start, end, application));
-
-    // fetch new detection framework anomalies by metric and/or dataset
-    anomalies.addAll(fetchFrameworkAnomaliesByMetricDataset(predicates));
+    // TODO: Prefer to have intersection of anomalies rather than union
+    List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+    anomalies.addAll(fetchLegacyAnomaliesByFunctionId(start, end, group, application, owner));
+    // Fetch anomalies by group
+    anomalies.addAll(fetchAnomaliesBySubsGroup(start, end, group));
+    // Fetch anomalies by application
+    anomalies.addAll(fetchAnomaliesByApplication(start, end, application));
+    // Fetch anomalies by metric and/or dataset
+    anomalies.addAll(fetchAnomaliesByMetricDataset(start, end, metric, dataset));
 
     // sort descending by start time
     Collections.sort(anomalies, new Comparator<MergedAnomalyResultDTO>() {
@@ -370,13 +294,118 @@ public class UserDashboardResource {
     return output;
   }
 
-  private Collection<? extends MergedAnomalyResultDTO> fetchFrameworkAnomaliesByMetricDataset(
-      List<Predicate> predicates) {
-    predicates.add(Predicate.NEQ("detectionConfigId", 0));
+  @Deprecated
+  private Collection<MergedAnomalyResultDTO> fetchLegacyAnomaliesByFunctionId(Long start, Long end, String group, String application, String owner) {
+    // Find functionIds which belong to application, subscription group and owner.
+    List<Predicate> predicates = new ArrayList<>();
+    Set<Long> functionIds = new HashSet<>();
+
+    // application (indirect)
+    if (StringUtils.isNotBlank(application)) {
+      List<AnomalyFunctionDTO> functions = this.functionDAO.findAllByApplication(application);
+      for (AnomalyFunctionDTO function : functions) {
+        if (function.getIsActive()) {
+          functionIds.add(function.getId());
+        }
+      }
+    }
+    // Support for partially migrated alerts.
+    List<DetectionAlertConfigDTO> notifications = detectionAlertDAO.findByPredicate(Predicate.EQ("application", application));
+    for (DetectionAlertConfigDTO notification : notifications) {
+      for (long id : ConfigUtils.getLongs(notification.getProperties().get(PROP_DETECTION_CONFIG_IDS))) {
+        AnomalyFunctionDTO function = this.functionDAO.findById(id);
+        if (function != null && function.getIsActive()) {
+          functionIds.add(id);
+        }
+      }
+    }
+
+    // group (indirect)
+    Set<Long> groupFunctionIds = new HashSet<>();
+    if (StringUtils.isNotBlank(group)) {
+      AlertConfigDTO alert = this.alertDAO.findWhereNameEquals(group);
+      if (alert != null) {
+        for (long id : alert.getEmailConfig().getFunctionIds()) {
+          AnomalyFunctionDTO function = this.functionDAO.findById(id);
+          if (function != null && function.getIsActive()) {
+            groupFunctionIds.add(id);
+          }
+        }
+      }
+    }
+    if (!groupFunctionIds.isEmpty()) {
+      if (functionIds.isEmpty()) {
+        functionIds = groupFunctionIds;
+      } else {
+        functionIds.retainAll(groupFunctionIds);
+      }
+    }
+
+    // owner (indirect)
+    Set<Long> ownerFunctionIds = new HashSet<>();
+    if (StringUtils.isNotBlank(owner)) {
+      // TODO: replace database scan with targeted select
+      List<AnomalyFunctionDTO> functions = this.functionDAO.findAll();
+      for (AnomalyFunctionDTO function : functions) {
+        if (function.getIsActive() && Objects.equals(function.getCreatedBy(), owner)) {
+          ownerFunctionIds.add(function.getId());
+        }
+      }
+    }
+    if (!ownerFunctionIds.isEmpty()) {
+      if (functionIds.isEmpty()) {
+        functionIds = ownerFunctionIds;
+      } else {
+        functionIds.retainAll(ownerFunctionIds);
+      }
+    }
+
+    // Predicate on start time end time and function Id.
+    predicates.add(Predicate.IN("functionId", functionIds.toArray()));
+    predicates.add(Predicate.GE("endTime", start));
+    if (end != null) {
+      predicates.add(Predicate.LT("startTime", end));
+    }
+
+    // Fetch legacy anomalies via predicates
+    List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(Predicate.AND(predicates.toArray(new Predicate[predicates.size()])));
+    // filter (un-notified && non-user-reported) anomalies
+    // TODO remove once index select on user-reported anomalies available
+    Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator();
+    while (itAnomaly.hasNext()) {
+      MergedAnomalyResultDTO anomaly = itAnomaly.next();
+      if (!anomaly.isNotified() &&
+          !AnomalyResultSource.USER_LABELED_ANOMALY.equals(anomaly.getAnomalyResultSource())) {
+        itAnomaly.remove();
+      }
+    }
+
+    return anomalies;
+  }
+
+  private Collection<MergedAnomalyResultDTO> fetchAnomaliesByMetricDataset(Long start, Long end, String metric, String dataset) {
+    if (StringUtils.isBlank(metric) && StringUtils.isBlank(dataset)) {
+      return Collections.emptyList();
+    }
+
+    List<Predicate> predicates = new ArrayList<>();
+    predicates.add(Predicate.GE("endTime", start));
+    if (end != null) {
+      predicates.add(Predicate.LT("startTime", end));
+    }
+
+    // Filter by metric and dataset
+    if (metric != null) {
+      predicates.add(Predicate.EQ("metric", metric));
+    }
+    if (dataset != null) {
+      predicates.add(Predicate.EQ("collection", dataset));
+    }
+
     return this.anomalyDAO.findByPredicate(Predicate.AND(predicates.toArray(new Predicate[predicates.size()])));
   }
 
-  private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByApplication(Long start, Long end, String application) throws Exception {
+  private Collection<MergedAnomalyResultDTO> fetchAnomaliesByApplication(Long start, Long end, String application) throws Exception {
     if (StringUtils.isBlank(application)) {
       return Collections.emptyList();
     }
@@ -389,10 +418,10 @@ public class UserDashboardResource {
       detectionConfigIds.addAll(alertConfigDTO.getVectorClocks().keySet());
     }
 
-    return fetchFrameworkAnomaliesByConfigIds(start, end, detectionConfigIds);
+    return fetchAnomaliesByConfigIds(start, end, detectionConfigIds);
   }
 
-  private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByGroup(Long start, Long end, String group) throws Exception {
+  private Collection<MergedAnomalyResultDTO> fetchAnomaliesBySubsGroup(Long start, Long end, String group) throws Exception {
     if (StringUtils.isBlank(group)) {
       return Collections.emptyList();
     }
@@ -405,10 +434,10 @@ public class UserDashboardResource {
       detectionConfigIds.addAll(alertConfigDTO.getVectorClocks().keySet());
     }
 
-    return fetchFrameworkAnomaliesByConfigIds(start, end, detectionConfigIds);
+    return fetchAnomaliesByConfigIds(start, end, detectionConfigIds);
   }
 
-  private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByConfigIds(Long start, Long end, Set<Long> detectionConfigIds) throws Exception {
+  private Collection<MergedAnomalyResultDTO> fetchAnomaliesByConfigIds(Long start, Long end, Set<Long> detectionConfigIds) throws Exception {
     if (detectionConfigIds.isEmpty()) {
       return Collections.emptyList();
     }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
index d7d728a..2b9269f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
@@ -617,6 +617,8 @@ public class DetectionMigrationResource {
   }
 
   @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
   @ApiOperation("migrate all applications")
   @Path("/applications")
   public Response migrateApplication() {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java
index dc26f0c..3f97f7c 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java
@@ -75,6 +75,7 @@ public class UserDashboardResourceTest {
     this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(100, 500, this.functionIds.get(0), "test_metric", "test_dataset"))); // func A
     this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(800, 1200, this.functionIds.get(0), "test_metric", "test_dataset"))); // func A
     this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1500, this.functionIds.get(1), "test_metric", "test_dataset"))); // func B
+    this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1600, this.functionIds.get(2), "test_metric", "test_dataset"))); // func C
     this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1600, this.functionIds.get(2), "test_metric_2", "test_dataset"))); // func C
 
     for (Long id : this.anomalyIds) {
@@ -124,8 +125,8 @@ public class UserDashboardResourceTest {
   @Test
   public void testAnomaliesByGroup() throws Exception {
     List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, "myAlertB", null, null, false, null);
-    Assert.assertEquals(anomalies.size(), 1);
-    Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(3)));
+    Assert.assertEquals(anomalies.size(), 2);
+    Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(3), this.anomalyIds.get(4)));
   }
 
   @Test
@@ -144,15 +145,15 @@ public class UserDashboardResourceTest {
   @Test
   public void testAnomaliesByMetric() throws Exception {
     List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, null, "test_metric", "test_dataset", false, null);
-    Assert.assertEquals(anomalies.size(), 2);
-    Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2)));
+    Assert.assertEquals(anomalies.size(), 3);
+    Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3)));
   }
 
   @Test
   public void testAnomaliesByDataset() throws Exception {
     List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, null, null, "test_dataset", false, null);
-    Assert.assertEquals(anomalies.size(), 3);
-    Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3)));
+    Assert.assertEquals(anomalies.size(), 4);
+    Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3), this.anomalyIds.get(4)));
   }
 
   private MergedAnomalyResultDTO makeAnomaly(long start, long end, Long functionId, String metric, String dataset) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org