You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/02/20 00:18:37 UTC
[incubator-pinot] branch master updated: [TE] Fix bugs in user
dashboard endpoint (#3853)
This is an automated email from the ASF dual-hosted git repository.
akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0675aee [TE] Fix bugs in user dashboard endpoint (#3853)
0675aee is described below
commit 0675aee34f00d7684579c3b18a3e4dd6bff7acc1
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Tue Feb 19 16:18:32 2019 -0800
[TE] Fix bugs in user dashboard endpoint (#3853)
---
.../api/user/dashboard/UserDashboardResource.java | 221 ++++++++++++---------
.../detection/DetectionMigrationResource.java | 2 +
.../resource/v2/UserDashboardResourceTest.java | 13 +-
3 files changed, 134 insertions(+), 102 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
index 1e889b0..bbf2e9e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
@@ -42,7 +42,6 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.thirdeye.api.Constants;
import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
@@ -65,11 +64,14 @@ import org.apache.pinot.thirdeye.datalayer.util.Predicate;
import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
import org.apache.pinot.thirdeye.detection.CurrentAndBaselineLoader;
import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.thirdeye.detection.yaml.YamlDetectionAlertConfigTranslator.*;
+
/**
* Endpoints for user-customized dashboards (currently alerts only)
@@ -167,8 +169,6 @@ public class UserDashboardResource {
@QueryParam("fetchTrueAnomaly") @DefaultValue("false") boolean fetchTrueAnomaly,
@ApiParam(value = "max number of results")
@QueryParam("limit") Integer limit) throws Exception {
- Map<String, String> responseMessage = new HashMap<>();
- List<Predicate> predicates = new ArrayList<>();
LOG.info("[USER DASHBOARD] Fetching anomalies with filters. Start: " + start + " end: " + end + " metric: "
+ metric + " dataset: " + dataset + " owner: " + owner + " application: " + application + " group: " + group
+ " fetchTrueAnomaly: " + fetchTrueAnomaly + " limit: " + limit);
@@ -178,98 +178,22 @@ public class UserDashboardResource {
LOG.warn("No upper limit specified while fetching anomalies. Defaulting to " + ANOMALIES_LIMIT_DEFAULT);
limit = ANOMALIES_LIMIT_DEFAULT;
}
-
- // Filter by metric and dataset
- if (metric != null) {
- predicates.add(Predicate.EQ("metric", metric));
- }
- if (dataset != null) {
- predicates.add(Predicate.EQ("collection", dataset));
- }
-
- // anomaly window start and end
Preconditions.checkNotNull(start, "Please specify the start time of the anomaly retrieval window");
- predicates.add(Predicate.GE("endTime", start));
- if (end != null) {
- predicates.add(Predicate.LT("startTime", end));
- }
// TODO support index select on user-reported anomalies
// predicates.add(Predicate.OR(
// Predicate.EQ("notified", true),
// Predicate.EQ("anomalyResultSource", AnomalyResultSource.USER_LABELED_ANOMALY)));
- // application (indirect)
- Set<Long> applicationFunctionIds = new HashSet<>();
- if (StringUtils.isNotBlank(application)) {
- List<AnomalyFunctionDTO> functions = this.functionDAO.findAllByApplication(application);
- for (AnomalyFunctionDTO function : functions) {
- if (function.getIsActive()) {
- applicationFunctionIds.add(function.getId());
- }
- }
- }
-
- // TODO: deprecate after migration
- // Support for partially migrated alerts.
- List<DetectionAlertConfigDTO> notifications = detectionAlertDAO.findByPredicate(Predicate.EQ("application", application));
- for (DetectionAlertConfigDTO notification : notifications) {
- applicationFunctionIds.addAll(notification.getVectorClocks().keySet());
- }
-
- // group (indirect)
- Set<Long> groupFunctionIds = new HashSet<>();
- if (StringUtils.isNotBlank(group)) {
- AlertConfigDTO alert = this.alertDAO.findWhereNameEquals(group);
- if (alert != null) {
- groupFunctionIds.addAll(alert.getEmailConfig().getFunctionIds());
- }
- }
-
- // owner (indirect)
- Set<Long> ownerFunctionIds = new HashSet<>();
- if (StringUtils.isNotBlank(owner)) {
- // TODO: replace database scan with targeted select
- List<AnomalyFunctionDTO> functions = this.functionDAO.findAll();
- for (AnomalyFunctionDTO function : functions) {
- if (Objects.equals(function.getCreatedBy(), owner)) {
- ownerFunctionIds.add(function.getId());
- }
- }
- }
-
- // anomaly function ids
- List<Predicate> oldPredicates = new ArrayList<>(predicates);
- if (StringUtils.isNotBlank(application) || StringUtils.isNotBlank(group) || StringUtils.isNotBlank(owner)) {
- Set<Long> functionIds = new HashSet<>();
- functionIds.addAll(applicationFunctionIds);
- functionIds.addAll(groupFunctionIds);
- functionIds.addAll(ownerFunctionIds);
-
- oldPredicates.add(Predicate.IN("functionId", functionIds.toArray()));
- }
-
- // fetch legacy anomalies via predicatesprincipal
- List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(Predicate.AND(oldPredicates.toArray(new Predicate[oldPredicates.size()])));
- // filter (un-notified && non-user-reported) anomalies
- // TODO remove once index select on user-reported anomalies available
- Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator();
- while (itAnomaly.hasNext()) {
- MergedAnomalyResultDTO anomaly = itAnomaly.next();
- if (!anomaly.isNotified() &&
- !AnomalyResultSource.USER_LABELED_ANOMALY.equals(anomaly.getAnomalyResultSource())) {
- itAnomaly.remove();
- }
- }
-
- // fetch new detection framework anomalies by group
- anomalies.addAll(fetchFrameworkAnomaliesByGroup(start, end, group));
-
- // fetch new detection framework anomalies by application
- anomalies.addAll(fetchFrameworkAnomaliesByApplication(start, end, application));
-
- // fetch new detection framework anomalies by metric and/or dataset
- anomalies.addAll(fetchFrameworkAnomaliesByMetricDataset(predicates));
+ // TODO: Prefer to have intersection of anomalies rather than union
+ List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+ anomalies.addAll(fetchLegacyAnomaliesByFunctionId(start, end, group, application, owner));
+ // Fetch anomalies by group
+ anomalies.addAll(fetchAnomaliesBySubsGroup(start, end, group));
+ // Fetch anomalies by application
+ anomalies.addAll(fetchAnomaliesByApplication(start, end, application));
+ // Fetch anomalies by metric and/or dataset
+ anomalies.addAll(fetchAnomaliesByMetricDataset(start, end, metric, dataset));
// sort descending by start time
Collections.sort(anomalies, new Comparator<MergedAnomalyResultDTO>() {
@@ -370,13 +294,118 @@ public class UserDashboardResource {
return output;
}
- private Collection<? extends MergedAnomalyResultDTO> fetchFrameworkAnomaliesByMetricDataset(
- List<Predicate> predicates) {
- predicates.add(Predicate.NEQ("detectionConfigId", 0));
+ @Deprecated
+ private Collection<MergedAnomalyResultDTO> fetchLegacyAnomaliesByFunctionId(Long start, Long end, String group, String application, String owner) {
+ // Find functionIds which belong to application, subscription group and owner.
+ List<Predicate> predicates = new ArrayList<>();
+ Set<Long> functionIds = new HashSet<>();
+
+ // application (indirect)
+ if (StringUtils.isNotBlank(application)) {
+ List<AnomalyFunctionDTO> functions = this.functionDAO.findAllByApplication(application);
+ for (AnomalyFunctionDTO function : functions) {
+ if (function.getIsActive()) {
+ functionIds.add(function.getId());
+ }
+ }
+ }
+ // Support for partially migrated alerts.
+ List<DetectionAlertConfigDTO> notifications = detectionAlertDAO.findByPredicate(Predicate.EQ("application", application));
+ for (DetectionAlertConfigDTO notification : notifications) {
+ for (long id : ConfigUtils.getLongs(notification.getProperties().get(PROP_DETECTION_CONFIG_IDS))) {
+ AnomalyFunctionDTO function = this.functionDAO.findById(id);
+ if (function != null && function.getIsActive()) {
+ functionIds.add(id);
+ }
+ }
+ }
+
+ // group (indirect)
+ Set<Long> groupFunctionIds = new HashSet<>();
+ if (StringUtils.isNotBlank(group)) {
+ AlertConfigDTO alert = this.alertDAO.findWhereNameEquals(group);
+ if (alert != null) {
+ for (long id : alert.getEmailConfig().getFunctionIds()) {
+ AnomalyFunctionDTO function = this.functionDAO.findById(id);
+ if (function != null && function.getIsActive()) {
+ groupFunctionIds.add(id);
+ }
+ }
+ }
+ }
+ if (!groupFunctionIds.isEmpty()) {
+ if (functionIds.isEmpty()) {
+ functionIds = groupFunctionIds;
+ } else {
+ functionIds.retainAll(groupFunctionIds);
+ }
+ }
+
+ // owner (indirect)
+ Set<Long> ownerFunctionIds = new HashSet<>();
+ if (StringUtils.isNotBlank(owner)) {
+ // TODO: replace database scan with targeted select
+ List<AnomalyFunctionDTO> functions = this.functionDAO.findAll();
+ for (AnomalyFunctionDTO function : functions) {
+ if (function.getIsActive() && Objects.equals(function.getCreatedBy(), owner)) {
+ ownerFunctionIds.add(function.getId());
+ }
+ }
+ }
+ if (!ownerFunctionIds.isEmpty()) {
+ if (functionIds.isEmpty()) {
+ functionIds = ownerFunctionIds;
+ } else {
+ functionIds.retainAll(ownerFunctionIds);
+ }
+ }
+
+ // Predicate on start time end time and function Id.
+ predicates.add(Predicate.IN("functionId", functionIds.toArray()));
+ predicates.add(Predicate.GE("endTime", start));
+ if (end != null) {
+ predicates.add(Predicate.LT("startTime", end));
+ }
+
+ // Fetch legacy anomalies via predicates
+ List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(Predicate.AND(predicates.toArray(new Predicate[predicates.size()])));
+ // filter (un-notified && non-user-reported) anomalies
+ // TODO remove once index select on user-reported anomalies available
+ Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator();
+ while (itAnomaly.hasNext()) {
+ MergedAnomalyResultDTO anomaly = itAnomaly.next();
+ if (!anomaly.isNotified() &&
+ !AnomalyResultSource.USER_LABELED_ANOMALY.equals(anomaly.getAnomalyResultSource())) {
+ itAnomaly.remove();
+ }
+ }
+
+ return anomalies;
+ }
+
+ private Collection<MergedAnomalyResultDTO> fetchAnomaliesByMetricDataset(Long start, Long end, String metric, String dataset) {
+ if (StringUtils.isBlank(metric) && StringUtils.isBlank(dataset)) {
+ return Collections.emptyList();
+ }
+
+ List<Predicate> predicates = new ArrayList<>();
+ predicates.add(Predicate.GE("endTime", start));
+ if (end != null) {
+ predicates.add(Predicate.LT("startTime", end));
+ }
+
+ // Filter by metric and dataset
+ if (metric != null) {
+ predicates.add(Predicate.EQ("metric", metric));
+ }
+ if (dataset != null) {
+ predicates.add(Predicate.EQ("collection", dataset));
+ }
+
return this.anomalyDAO.findByPredicate(Predicate.AND(predicates.toArray(new Predicate[predicates.size()])));
}
- private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByApplication(Long start, Long end, String application) throws Exception {
+ private Collection<MergedAnomalyResultDTO> fetchAnomaliesByApplication(Long start, Long end, String application) throws Exception {
if (StringUtils.isBlank(application)) {
return Collections.emptyList();
}
@@ -389,10 +418,10 @@ public class UserDashboardResource {
detectionConfigIds.addAll(alertConfigDTO.getVectorClocks().keySet());
}
- return fetchFrameworkAnomaliesByConfigIds(start, end, detectionConfigIds);
+ return fetchAnomaliesByConfigIds(start, end, detectionConfigIds);
}
- private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByGroup(Long start, Long end, String group) throws Exception {
+ private Collection<MergedAnomalyResultDTO> fetchAnomaliesBySubsGroup(Long start, Long end, String group) throws Exception {
if (StringUtils.isBlank(group)) {
return Collections.emptyList();
}
@@ -405,10 +434,10 @@ public class UserDashboardResource {
detectionConfigIds.addAll(alertConfigDTO.getVectorClocks().keySet());
}
- return fetchFrameworkAnomaliesByConfigIds(start, end, detectionConfigIds);
+ return fetchAnomaliesByConfigIds(start, end, detectionConfigIds);
}
- private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByConfigIds(Long start, Long end, Set<Long> detectionConfigIds) throws Exception {
+ private Collection<MergedAnomalyResultDTO> fetchAnomaliesByConfigIds(Long start, Long end, Set<Long> detectionConfigIds) throws Exception {
if (detectionConfigIds.isEmpty()) {
return Collections.emptyList();
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
index d7d728a..2b9269f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
@@ -617,6 +617,8 @@ public class DetectionMigrationResource {
}
@POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
@ApiOperation("migrate all applications")
@Path("/applications")
public Response migrateApplication() {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java
index dc26f0c..3f97f7c 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java
@@ -75,6 +75,7 @@ public class UserDashboardResourceTest {
this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(100, 500, this.functionIds.get(0), "test_metric", "test_dataset"))); // func A
this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(800, 1200, this.functionIds.get(0), "test_metric", "test_dataset"))); // func A
this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1500, this.functionIds.get(1), "test_metric", "test_dataset"))); // func B
+ this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1600, this.functionIds.get(2), "test_metric", "test_dataset"))); // func C
this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1600, this.functionIds.get(2), "test_metric_2", "test_dataset"))); // func C
for (Long id : this.anomalyIds) {
@@ -124,8 +125,8 @@ public class UserDashboardResourceTest {
@Test
public void testAnomaliesByGroup() throws Exception {
List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, "myAlertB", null, null, false, null);
- Assert.assertEquals(anomalies.size(), 1);
- Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(3)));
+ Assert.assertEquals(anomalies.size(), 2);
+ Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(3), this.anomalyIds.get(4)));
}
@Test
@@ -144,15 +145,15 @@ public class UserDashboardResourceTest {
@Test
public void testAnomaliesByMetric() throws Exception {
List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, null, "test_metric", "test_dataset", false, null);
- Assert.assertEquals(anomalies.size(), 2);
- Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2)));
+ Assert.assertEquals(anomalies.size(), 3);
+ Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3)));
}
@Test
public void testAnomaliesByDataset() throws Exception {
List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, null, null, "test_dataset", false, null);
- Assert.assertEquals(anomalies.size(), 3);
- Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3)));
+ Assert.assertEquals(anomalies.size(), 4);
+ Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3), this.anomalyIds.get(4)));
}
private MergedAnomalyResultDTO makeAnomaly(long start, long end, Long functionId, String metric, String dataset) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org