You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/02/13 23:52:58 UTC

[incubator-pinot] branch master updated: [TE] Extend anomalies endpoint to fetch by metric/dataset and true anomalies (#3832)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e518d0a  [TE] Extend anomalies endpoint to fetch by metric/dataset and true anomalies (#3832)
e518d0a is described below

commit e518d0a0fae3e4045ac4a48445d868b247e014d1
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Wed Feb 13 15:52:54 2019 -0800

    [TE] Extend anomalies endpoint to fetch by metric/dataset and true anomalies (#3832)
---
 .../user/dashboard}/UserDashboardResource.java     | 163 +++++++++++++--------
 .../dashboard/ThirdEyeDashboardApplication.java    |   2 +-
 .../resource/v2/UserDashboardResourceTest.java     |  42 ++++--
 3 files changed, 133 insertions(+), 74 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/UserDashboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
similarity index 81%
rename from thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/UserDashboardResource.java
rename to thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
index 377863e..1e889b0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/UserDashboardResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
@@ -17,32 +17,10 @@
  * under the License.
  */
 
-package org.apache.pinot.thirdeye.dashboard.resources.v2;
+package org.apache.pinot.thirdeye.api.user.dashboard;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Collections2;
-import org.apache.pinot.thirdeye.api.Constants;
-import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
-import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
-import org.apache.pinot.thirdeye.datalayer.bao.AlertConfigManager;
-import org.apache.pinot.thirdeye.datalayer.dto.AlertConfigDTO;
-import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
-import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
-import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
-import org.apache.pinot.thirdeye.datalayer.bao.AnomalyFunctionManager;
-import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
-import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.util.Predicate;
-import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
-import org.apache.pinot.thirdeye.detection.CurrentAndBaselineLoader;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 import com.wordnik.swagger.annotations.Api;
 import com.wordnik.swagger.annotations.ApiOperation;
 import com.wordnik.swagger.annotations.ApiParam;
@@ -58,12 +36,39 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import javax.annotation.Nullable;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.thirdeye.api.Constants;
+import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
+import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.ResourceUtils;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
+import org.apache.pinot.thirdeye.datalayer.bao.AlertConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.AnomalyFunctionManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.detection.CurrentAndBaselineLoader;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -73,6 +78,8 @@ import org.apache.commons.lang.StringUtils;
 @Path(value = "/userdashboard")
 @Produces(MediaType.APPLICATION_JSON)
 public class UserDashboardResource {
+  protected static final Logger LOG = LoggerFactory.getLogger(UserDashboardResource.class);
+
   private static final int ANOMALIES_LIMIT_DEFAULT = 500;
 
   private final MergedAnomalyResultManager anomalyDAO;
@@ -152,18 +159,40 @@ public class UserDashboardResource {
       @QueryParam("application") String application,
       @ApiParam(value = "subscription group")
       @QueryParam("group") String group,
+      @ApiParam(value = "The name of the metric to fetch anomalies from")
+      @QueryParam("metric") String metric,
+      @ApiParam(value = "The name of the pinot table to which this metric belongs")
+      @QueryParam("dataset") String dataset,
+      @ApiParam(value = "Specify if you want to only fetch true anomalies")
+      @QueryParam("fetchTrueAnomaly") @DefaultValue("false") boolean fetchTrueAnomaly,
       @ApiParam(value = "max number of results")
       @QueryParam("limit") Integer limit) throws Exception {
+    Map<String, String> responseMessage = new HashMap<>();
+    List<Predicate> predicates = new ArrayList<>();
+    LOG.info("[USER DASHBOARD] Fetching anomalies with filters. Start: " + start + " end: " + end + " metric: "
+        + metric + " dataset: " + dataset + " owner: " + owner + " application: " + application + " group: " + group
+        + " fetchTrueAnomaly: " + fetchTrueAnomaly + " limit: " + limit);
 
-    //
-    // query params
-    //
+    // Safety conditions
     if (limit == null) {
+      LOG.warn("No upper limit specified while fetching anomalies. Defaulting to " + ANOMALIES_LIMIT_DEFAULT);
       limit = ANOMALIES_LIMIT_DEFAULT;
     }
 
-    // safety condition
-    Preconditions.checkNotNull(start);
+    // Filter by metric and dataset
+    if (metric != null) {
+      predicates.add(Predicate.EQ("metric", metric));
+    }
+    if (dataset != null) {
+      predicates.add(Predicate.EQ("collection", dataset));
+    }
+
+    // anomaly window start and end
+    Preconditions.checkNotNull(start, "Please specify the start time of the anomaly retrieval window");
+    predicates.add(Predicate.GE("endTime", start));
+    if (end != null) {
+      predicates.add(Predicate.LT("startTime", end));
+    }
 
     // TODO support index select on user-reported anomalies
 //    predicates.add(Predicate.OR(
@@ -210,34 +239,18 @@ public class UserDashboardResource {
     }
 
     // anomaly function ids
-    List<Predicate> predicates = new ArrayList<>();
-
+    List<Predicate> oldPredicates = new ArrayList<>(predicates);
     if (StringUtils.isNotBlank(application) || StringUtils.isNotBlank(group) || StringUtils.isNotBlank(owner)) {
       Set<Long> functionIds = new HashSet<>();
       functionIds.addAll(applicationFunctionIds);
       functionIds.addAll(groupFunctionIds);
       functionIds.addAll(ownerFunctionIds);
 
-      predicates.add(Predicate.IN("functionId", functionIds.toArray()));
+      oldPredicates.add(Predicate.IN("functionId", functionIds.toArray()));
     }
 
-    // anomaly window start
-    if (start != null) {
-      predicates.add(Predicate.GT("endTime", start));
-    }
-
-    // anomaly window end
-    if (end != null) {
-      predicates.add(Predicate.LT("startTime", end));
-    }
-
-    //
-    // fetch anomalies
-    //
-
-    // fetch legacy anomalies via predicates
-    List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(Predicate.AND(predicates.toArray(new Predicate[predicates.size()])));
-
+    // fetch legacy anomalies via predicatesprincipal
+    List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(Predicate.AND(oldPredicates.toArray(new Predicate[oldPredicates.size()])));
     // filter (un-notified && non-user-reported) anomalies
     // TODO remove once index select on user-reported anomalies available
     Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator();
@@ -250,14 +263,13 @@ public class UserDashboardResource {
     }
 
     // fetch new detection framework anomalies by group
-    if (StringUtils.isNotBlank(group)) {
-      anomalies.addAll(fetchFrameworkAnomaliesByGroup(start, end, group));
-    }
+    anomalies.addAll(fetchFrameworkAnomaliesByGroup(start, end, group));
 
     // fetch new detection framework anomalies by application
-    if (StringUtils.isNotBlank(application)) {
-      anomalies.addAll(fetchFrameworkAnomaliesByApplication(start, end, application));
-    }
+    anomalies.addAll(fetchFrameworkAnomaliesByApplication(start, end, application));
+
+    // fetch new detection framework anomalies by metric and/or dataset
+    anomalies.addAll(fetchFrameworkAnomaliesByMetricDataset(predicates));
 
     // sort descending by start time
     Collections.sort(anomalies, new Comparator<MergedAnomalyResultDTO>() {
@@ -267,29 +279,42 @@ public class UserDashboardResource {
       }
     });
 
+    if (fetchTrueAnomaly) {
+      // Filter and retain only true anomalies
+      List<MergedAnomalyResultDTO> trueAnomalies = new ArrayList<>();
+      for (MergedAnomalyResultDTO anomaly : anomalies) {
+        if (anomaly.getFeedback() != null && anomaly.getFeedback().getFeedbackType().isAnomaly()) {
+          trueAnomalies.add(anomaly);
+        }
+      }
+      anomalies = trueAnomalies;
+    }
+
     // limit result size
     anomalies = anomalies.subList(0, Math.min(anomalies.size(), limit));
 
-    //
-    // fetch functions
-    //
+    List<AnomalySummary> output = getAnomalyFormattedOutput(anomalies);
+
+    LOG.info("Successfully returned " + output.size() + " anomalies.");
+    return output;
+  }
+
+  private List<AnomalySummary> getAnomalyFormattedOutput(List<MergedAnomalyResultDTO> anomalies) {
+    List<AnomalySummary> output = new ArrayList<>();
+
+    // fetch functions & build function id to function object mapping
     Set<Long> anomalyFunctionIds = new HashSet<>();
     for (MergedAnomalyResultDTO anomaly : anomalies) {
       if (anomaly.getFunctionId() != null) {
         anomalyFunctionIds.add(anomaly.getFunctionId());
       }
     }
-
     List<AnomalyFunctionDTO> functions = this.functionDAO.findByPredicate(Predicate.IN("baseId", anomalyFunctionIds.toArray()));
     Map<Long, AnomalyFunctionDTO> id2function = new HashMap<>();
     for (AnomalyFunctionDTO function : functions) {
       id2function.put(function.getId(), function);
     }
 
-    //
-    // format output
-    //
-    List<AnomalySummary> output = new ArrayList<>();
     for (MergedAnomalyResultDTO anomaly : anomalies) {
       long metricId = this.getMetricId(anomaly);
 
@@ -345,7 +370,17 @@ public class UserDashboardResource {
     return output;
   }
 
+  private Collection<? extends MergedAnomalyResultDTO> fetchFrameworkAnomaliesByMetricDataset(
+      List<Predicate> predicates) {
+    predicates.add(Predicate.NEQ("detectionConfigId", 0));
+    return this.anomalyDAO.findByPredicate(Predicate.AND(predicates.toArray(new Predicate[predicates.size()])));
+  }
+
   private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByApplication(Long start, Long end, String application) throws Exception {
+    if (StringUtils.isBlank(application)) {
+      return Collections.emptyList();
+    }
+
     List<DetectionAlertConfigDTO> alerts =
         this.detectionAlertDAO.findByPredicate(Predicate.EQ("application", application));
 
@@ -358,6 +393,10 @@ public class UserDashboardResource {
   }
 
   private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByGroup(Long start, Long end, String group) throws Exception {
+    if (StringUtils.isBlank(group)) {
+      return Collections.emptyList();
+    }
+
     List<DetectionAlertConfigDTO> alerts =
         this.detectionAlertDAO.findByPredicate(Predicate.EQ("name", group));
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
index c641052..db8d237 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
@@ -60,7 +60,7 @@ import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseMetricResource;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseResource;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseSessionResource;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.TimeSeriesResource;
-import org.apache.pinot.thirdeye.dashboard.resources.v2.UserDashboardResource;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.rootcause.DefaultEntityFormatter;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.rootcause.FormatterLoader;
 import org.apache.pinot.thirdeye.dataset.DatasetAutoOnboardResource;
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java
index f625e68..dc26f0c 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java
@@ -1,6 +1,9 @@
 package org.apache.pinot.thirdeye.dashboard.resource.v2;
 
-import org.apache.pinot.thirdeye.dashboard.resources.v2.UserDashboardResource;
+import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyFeedback;
+import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
+import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomalySummary;
 import org.apache.pinot.thirdeye.datalayer.bao.AlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.AnomalyFunctionManager;
@@ -11,6 +14,7 @@ import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
 import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
 import org.apache.pinot.thirdeye.datalayer.dto.AlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datalayer.pojo.AlertConfigBean;
@@ -68,10 +72,10 @@ public class UserDashboardResourceTest {
     // anomalies
     this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
     this.anomalyIds = new ArrayList<>();
-    this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(100, 500, this.functionIds.get(0)))); // func A
-    this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(700, 1200, this.functionIds.get(0)))); // func A
-    this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1500, this.functionIds.get(1)))); // func B
-    this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1600, this.functionIds.get(2)))); // func C
+    this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(100, 500, this.functionIds.get(0), "test_metric", "test_dataset"))); // func A
+    this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(800, 1200, this.functionIds.get(0), "test_metric", "test_dataset"))); // func A
+    this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1500, this.functionIds.get(1), "test_metric", "test_dataset"))); // func B
+    this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1600, this.functionIds.get(2), "test_metric_2", "test_dataset"))); // func C
 
     for (Long id : this.anomalyIds) {
       Assert.assertNotNull(id);
@@ -106,41 +110,57 @@ public class UserDashboardResourceTest {
 
   @Test
   public void testAnomaliesByApplication() throws Exception {
-    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, "myApplicationA", null, null);
+    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, "myApplicationA", null, null, null, false, null);
     Assert.assertEquals(anomalies.size(), 2);
     Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2)));
   }
 
   @Test
   public void testAnomaliesByApplicationInvalid() throws Exception {
-    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, "Invalid", null, null);
+    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, "Invalid", null, null, null, false, null);
     Assert.assertEquals(anomalies.size(), 0);
   }
 
   @Test
   public void testAnomaliesByGroup() throws Exception {
-    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, "myAlertB", null);
+    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, "myAlertB", null, null, false, null);
     Assert.assertEquals(anomalies.size(), 1);
     Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(3)));
   }
 
   @Test
   public void testAnomaliesByGroupInvalid() throws Exception {
-    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, "Invalid", null);
+    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, "Invalid", null, null, false, null);
     Assert.assertEquals(anomalies.size(), 0);
   }
 
   @Test
   public void testAnomaliesLimit() throws Exception {
-    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, "myApplicationA", null, 1);
+    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, "myApplicationA", null, null, null, false, 1);
     Assert.assertEquals(anomalies.size(), 1);
     Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1)));
   }
 
-  private MergedAnomalyResultDTO makeAnomaly(long start, long end, Long functionId) {
+  @Test
+  public void testAnomaliesByMetric() throws Exception {
+    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, null, "test_metric", "test_dataset", false, null);
+    Assert.assertEquals(anomalies.size(), 2);
+    Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2)));
+  }
+
+  @Test
+  public void testAnomaliesByDataset() throws Exception {
+    List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, null, null, "test_dataset", false, null);
+    Assert.assertEquals(anomalies.size(), 3);
+    Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3)));
+  }
+
+  private MergedAnomalyResultDTO makeAnomaly(long start, long end, Long functionId, String metric, String dataset) {
     MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
     anomaly.setStartTime(start);
     anomaly.setEndTime(end);
+    anomaly.setMetric(metric);
+    anomaly.setCollection(dataset);
     anomaly.setFunctionId(functionId);
     anomaly.setNotified(true);
     return anomaly;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org