You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2020/07/29 23:17:33 UTC

[incubator-pinot] branch anomalies-pagination updated (b4e0733 -> 2a2be35)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a change to branch anomalies-pagination
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard b4e0733  add java docs
 discard d37df19  anomaly searcher
 discard 39fcef4  new anomalies endpoint
     new 2a2be35  new anomalies endpoint

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (b4e0733)
            \
             N -- N -- N   refs/heads/anomalies-pagination (2a2be35)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: new anomalies endpoint

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch anomalies-pagination
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 2a2be355b869110ebcfcf8e0c8af335a03c39415
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Wed Jul 22 14:22:33 2020 -0700

    new anomalies endpoint
---
 .../thirdeye/constant/AnomalyFeedbackType.java     |   2 +-
 .../dashboard/ThirdEyeDashboardApplication.java    |   2 +
 .../v2/anomalies/AnomalySearchFilter.java          | 114 +++++++++++++++++
 .../v2/anomalies/AnomalySearchResource.java        |  61 +++++++++
 .../resources/v2/anomalies/AnomalySearcher.java    | 142 +++++++++++++++++++++
 .../datalayer/entity/MergedAnomalyResultIndex.java |   9 ++
 .../src/main/resources/schema/create-schema.sql    |   1 +
 7 files changed, 330 insertions(+), 1 deletion(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java
index c6c7bfa..93675dd 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalyFeedbackType.java
@@ -47,4 +47,4 @@ public enum AnomalyFeedbackType {
   public boolean isUnresolved() {
     return this.equals(NO_FEEDBACK);
   }
-}
+}
\ No newline at end of file
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
index c3decbc..20e547a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
@@ -59,6 +59,7 @@ import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseMetricResource;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseResource;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.RootCauseSessionResource;
 import org.apache.pinot.thirdeye.api.user.dashboard.UserDashboardResource;
+import org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies.AnomalySearchResource;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.rootcause.DefaultEntityFormatter;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.rootcause.FormatterLoader;
 import org.apache.pinot.thirdeye.dataset.DatasetAutoOnboardResource;
@@ -188,6 +189,7 @@ public class ThirdEyeDashboardApplication
         config.getAlertOnboardingPermitPerSecond()));
     env.jersey().register(new SqlDataSourceResource());
     env.jersey().register(new AlertResource());
+    env.jersey().register(new AnomalySearchResource());
 
     TimeSeriesLoader timeSeriesLoader = new DefaultTimeSeriesLoader(
         DAO_REGISTRY.getMetricConfigDAO(), DAO_REGISTRY.getDatasetConfigDAO(),
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java
new file mode 100644
index 0000000..e995e5a
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchFilter.java
@@ -0,0 +1,114 @@
+package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies;
+
+import java.util.List;
+
+
+/**
+ * The type Anomaly search filter.
+ */
+public class AnomalySearchFilter {
+  private final List<String> feedbacks;
+  private final List<String> subscriptionGroups;
+  private final List<String> detectionNames;
+  private final List<String> metrics;
+  private final List<String> datasets;
+  private final List<Long> anomalyIds;
+  private final Long startTime;
+  private final Long endTime;
+
+  /**
+   * Instantiates a new Anomaly search filter.
+   *
+   * @param startTime the start time
+   * @param endTime the end time
+   * @param feedbacks the feedbacks
+   * @param subscriptionGroups the subscription groups
+   * @param detectionNames the detection names
+   * @param metrics the metrics
+   * @param datasets the datasets
+   * @param anomalyIds the anomaly ids
+   */
+  public AnomalySearchFilter(Long startTime, Long endTime, List<String> feedbacks, List<String> subscriptionGroups, List<String> detectionNames,
+      List<String> metrics, List<String> datasets, List<Long> anomalyIds) {
+    this.feedbacks = feedbacks;
+    this.subscriptionGroups = subscriptionGroups;
+    this.detectionNames = detectionNames;
+    this.metrics = metrics;
+    this.datasets = datasets;
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.anomalyIds = anomalyIds;
+  }
+
+  /**
+   * Gets start time.
+   *
+   * @return the start time
+   */
+  public Long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Gets end time.
+   *
+   * @return the end time
+   */
+  public Long getEndTime() {
+    return endTime;
+  }
+
+  /**
+   * Gets feedbacks.
+   *
+   * @return the feedbacks
+   */
+  public List<String> getFeedbacks() {
+    return feedbacks;
+  }
+
+  /**
+   * Gets subscription groups.
+   *
+   * @return the subscription groups
+   */
+  public List<String> getSubscriptionGroups() {
+    return subscriptionGroups;
+  }
+
+  /**
+   * Gets detection names.
+   *
+   * @return the detection names
+   */
+  public List<String> getDetectionNames() {
+    return detectionNames;
+  }
+
+  /**
+   * Gets metrics.
+   *
+   * @return the metrics
+   */
+  public List<String> getMetrics() {
+    return metrics;
+  }
+
+  /**
+   * Gets datasets.
+   *
+   * @return the datasets
+   */
+  public List<String> getDatasets() {
+    return datasets;
+  }
+
+  /**
+   * Gets anomaly ids.
+   *
+   * @return the anomaly ids
+   */
+  public List<Long> getAnomalyIds() {
+    return anomalyIds;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java
new file mode 100644
index 0000000..81b51a9
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearchResource.java
@@ -0,0 +1,61 @@
+package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.List;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.thirdeye.api.Constants;
+
+
+/**
+ * The type Anomaly search resource.
+ */
+@Path(value = "/anomaly-search")
+@Produces(MediaType.APPLICATION_JSON)
+@Api(tags = {Constants.DETECTION_TAG})
+public class AnomalySearchResource {
+
+  private final AnomalySearcher anomalySearcher;
+
+  /**
+   * Instantiates a new Anomaly search resource.
+   */
+  public AnomalySearchResource() {
+    this.anomalySearcher = new AnomalySearcher();
+  }
+
+  /**
+   * Search and paginate the anomalies according to the parameters.
+   *
+   * @param limit the limit
+   * @param offset the offset
+   * @param startTime the start time
+   * @param endTime the end time
+   * @param feedbacks the feedback types, e.g. ANOMALY, NOT_ANOMALY
+   * @param subscriptionGroups the subscription groups
+   * @param detectionNames the detection names
+   * @param metrics the metrics
+   * @param datasets the datasets
+   * @param anomalyIds the anomaly ids
+   * @return the response
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation("Search and paginate anomalies according to the parameters")
+  public Response findAlerts(@QueryParam("limit") @DefaultValue("10") int limit,
+      @QueryParam("offset") @DefaultValue("0") int offset, @QueryParam("startTime") Long startTime,
+      @QueryParam("endTime") Long endTime, @QueryParam("feedbackStatus") List<String> feedbacks,
+      @QueryParam("subscriptionGroup") List<String> subscriptionGroups,
+      @QueryParam("detectionName") List<String> detectionNames, @QueryParam("metric") List<String> metrics,
+      @QueryParam("dataset") List<String> datasets, @QueryParam("anomalyId") List<Long> anomalyIds) {
+    AnomalySearchFilter searchFilter =
+        new AnomalySearchFilter(startTime, endTime, feedbacks, subscriptionGroups, detectionNames, metrics, datasets, anomalyIds);
+    return Response.ok().entity(this.anomalySearcher.search(searchFilter, limit, offset)).build();
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java
new file mode 100644
index 0000000..463cbe3
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/anomalies/AnomalySearcher.java
@@ -0,0 +1,142 @@
+package org.apache.pinot.thirdeye.dashboard.resources.v2.anomalies;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AbstractDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.pojo.DetectionConfigBean;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+
+import static org.apache.pinot.thirdeye.constant.AnomalyFeedbackType.*;
+
+
+/**
+ * The type Anomaly searcher.
+ */
+public class AnomalySearcher {
+  private final MergedAnomalyResultManager anomalyDAO;
+  private final DetectionConfigManager detectionConfigDAO;
+  private final DetectionAlertConfigManager detectionAlertConfigDAO;
+
+  /**
+   * Instantiates a new Anomaly searcher.
+   */
+  public AnomalySearcher() {
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+    this.detectionConfigDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.detectionAlertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager();
+  }
+
+  /**
+   * Search and retrieve all the anomalies matching to the search filter and limits.
+   *
+   * @param searchFilter the search filter
+   * @param limit the limit
+   * @param offset the offset
+   * @return the result
+   */
+  public Map<String, Object> search(AnomalySearchFilter searchFilter, int limit, int offset) {
+    Predicate predicate = Predicate.EQ("child", false);
+    if (searchFilter.getStartTime() != null) {
+      predicate = Predicate.AND(predicate, Predicate.LT("startTime", searchFilter.getEndTime()));
+    }
+    if (searchFilter.getEndTime() != null) {
+      predicate = Predicate.AND(predicate, Predicate.GT("endTime", searchFilter.getStartTime()));
+    }
+    // search by detections or subscription groups
+    Set<Long> detectionConfigIds = new HashSet<>();
+    Set<Long> subscribedDetectionConfigIds = new HashSet<>();
+    if (!searchFilter.getDetectionNames().isEmpty()) {
+      detectionConfigIds =
+          this.detectionConfigDAO.findByPredicate(Predicate.IN("name", searchFilter.getDetectionNames().toArray()))
+              .stream()
+              .map(DetectionConfigBean::getId)
+              .collect(Collectors.toSet());
+    }
+    if (!searchFilter.getSubscriptionGroups().isEmpty()) {
+      subscribedDetectionConfigIds = this.detectionAlertConfigDAO.findByPredicate(
+          Predicate.IN("name", searchFilter.getSubscriptionGroups().toArray()))
+          .stream()
+          .map(detectionAlertConfigDTO -> detectionAlertConfigDTO.getVectorClocks().keySet())
+          .flatMap(Collection::stream)
+          .collect(Collectors.toSet());
+    }
+    if (!searchFilter.getDetectionNames().isEmpty() && !searchFilter.getSubscriptionGroups().isEmpty()) {
+      // intersect the detection config ids if searching by both
+      detectionConfigIds.retainAll(subscribedDetectionConfigIds);
+    } else {
+      detectionConfigIds.addAll(subscribedDetectionConfigIds);
+    }
+    if (!searchFilter.getDetectionNames().isEmpty() || !searchFilter.getSubscriptionGroups().isEmpty()) {
+      // add the predicate using detection config id
+      if (detectionConfigIds.isEmpty()) {
+        // if detection not found, return empty result
+        return ImmutableMap.of("count", 0, "limit", limit, "offset", offset, "elements", Collections.emptyList());
+      }
+      predicate = Predicate.AND(predicate, Predicate.IN("detectionConfigId", detectionConfigIds.toArray()));
+    }
+
+    // search by datasets
+    if (!searchFilter.getDatasets().isEmpty()) {
+      List<Predicate> datasetPredicates = new ArrayList<>();
+      for (String dataset : searchFilter.getDatasets()) {
+        datasetPredicates.add(Predicate.LIKE("collection", "%" + dataset + "%"));
+      }
+      predicate = Predicate.AND(predicate, Predicate.OR(datasetPredicates.toArray(new Predicate[0])));
+    }
+    // search by metrics
+    if (!searchFilter.getMetrics().isEmpty()) {
+      predicate = Predicate.AND(predicate, Predicate.IN("metric", searchFilter.getMetrics().toArray()));
+    }
+    // search by ids
+    if (!searchFilter.getAnomalyIds().isEmpty()) {
+      predicate = Predicate.AND(predicate, Predicate.IN("baseId", searchFilter.getAnomalyIds().toArray()));
+    }
+
+    long count;
+    List<MergedAnomalyResultDTO> results;
+    if (searchFilter.getFeedbacks().isEmpty()) {
+      List<Long> anomalyIds = this.anomalyDAO.findIdsByPredicate(predicate)
+          .stream()
+          .sorted(Comparator.reverseOrder())
+          .collect(Collectors.toList());
+      count = anomalyIds.size();
+      results = anomalyIds.isEmpty() ? Collections.emptyList()
+          : this.anomalyDAO.findByIds(paginateResults(anomalyIds, offset, limit));
+    } else {
+      // filter by feedback types if requested
+      List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(predicate);
+      Set<AnomalyFeedbackType> feedbackFilters =
+          searchFilter.getFeedbacks().stream().map(AnomalyFeedbackType::valueOf).collect(Collectors.toSet());
+      results = anomalies.stream()
+          .filter(anomaly -> (anomaly.getFeedback() == null && feedbackFilters.contains(NO_FEEDBACK)) || (
+              anomaly.getFeedback() != null && feedbackFilters.contains(anomaly.getFeedback().getFeedbackType())))
+          .sorted(Comparator.comparingLong(AbstractDTO::getId).reversed())
+          .collect(Collectors.toList());
+      count = results.size();
+      results = paginateResults(results, offset, limit);
+    }
+    return ImmutableMap.of("count", count, "limit", limit, "offset", offset, "elements", results);
+  }
+
+  private <T> List<T> paginateResults(List<T> list, int offset, int limit) {
+    if (offset > list.size()) {
+      // requested page is out of bound
+      return Collections.emptyList();
+    }
+    return list.subList(offset, Math.min(offset + limit, list.size()));
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java
index 8e3c768..cfa5c68 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/entity/MergedAnomalyResultIndex.java
@@ -35,6 +35,7 @@ public class MergedAnomalyResultIndex extends AbstractIndexEntity {
   String metric;
   DimensionMap dimensions;
   boolean notified;
+  boolean child;
 
   public long getDetectionConfigId() {
     return detectionConfigId;
@@ -115,4 +116,12 @@ public class MergedAnomalyResultIndex extends AbstractIndexEntity {
   public void setNotified(boolean notified) {
     this.notified = notified;
   }
+
+  public boolean isChild() {
+    return child;
+  }
+
+  public void setChild(boolean child) {
+    this.child = child;
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql b/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql
index c93b922..8454348 100644
--- a/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql
+++ b/thirdeye/thirdeye-pinot/src/main/resources/schema/create-schema.sql
@@ -110,6 +110,7 @@ create table if not exists merged_anomaly_result_index (
     base_id bigint(20) not null,
     create_time timestamp,
     update_time timestamp default current_timestamp,
+    child boolean,
     version int(10)
 ) ENGINE=InnoDB;
 create index merged_anomaly_result_function_idx on merged_anomaly_result_index(function_id);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org