You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/04/19 19:34:49 UTC

[incubator-pinot] branch master updated: [TE] Early termination if there are no 'to' recipients (#4145)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 96752f7  [TE] Early termination if there are no 'to' recipients (#4145)
96752f7 is described below

commit 96752f7927968bc13bf6016dc203e29a48066c32
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Fri Apr 19 12:34:44 2019 -0700

    [TE] Early termination if there are no 'to' recipients (#4145)
---
 .../api/user/dashboard/UserDashboardResource.java  |  4 ---
 .../ToAllRecipientsDetectionAlertFilter.java       | 33 ++++++++++++++++------
 .../ToAllRecipientsDetectionAlertFilterTest.java   | 15 ++++++++--
 3 files changed, 38 insertions(+), 14 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
index ad2e50c..48612f2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
@@ -52,10 +52,6 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
-import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
-import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
-import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
-import org.apache.pinot.thirdeye.detection.CurrentAndBaselineLoader;
 import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
index 18a6771..906f3ef 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
@@ -19,6 +19,9 @@
 
 package org.apache.pinot.thirdeye.detection.alert.filter;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import java.util.stream.Collectors;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.detection.ConfigUtils;
@@ -26,7 +29,6 @@ import org.apache.pinot.thirdeye.detection.DataProvider;
 import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
 import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
 import org.apache.pinot.thirdeye.detection.alert.StatefulDetectionAlertFilter;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +36,6 @@ import java.util.Set;
 import org.apache.commons.collections.MapUtils;
 import org.apache.pinot.thirdeye.detection.annotation.AlertFilter;
 
-
 /**
  * The detection alert filter that sends the anomaly email to all recipients
  */
@@ -51,14 +52,14 @@ public class ToAllRecipientsDetectionAlertFilter extends StatefulDetectionAlertF
   Set<String> to;
   Set<String> cc;
   Set<String> bcc;
-  final Map<String, Set<String>> recipients;
+  final SetMultimap<String, String> recipients;
   List<Long> detectionConfigIds;
   boolean sendOnce;
 
   public ToAllRecipientsDetectionAlertFilter(DataProvider provider, DetectionAlertConfigDTO config, long endTime) {
     super(provider, config, endTime);
 
-    this.recipients = ConfigUtils.getMap(this.config.getProperties().get(PROP_RECIPIENTS));
+    this.recipients = HashMultimap.create(ConfigUtils.<String, String>getMultimap(this.config.getProperties().get(PROP_RECIPIENTS)));
     this.detectionConfigIds = ConfigUtils.getLongs(this.config.getProperties().get(PROP_DETECTION_CONFIG_IDS));
     this.sendOnce = MapUtils.getBoolean(this.config.getProperties(), PROP_SEND_ONCE, true);
   }
@@ -69,15 +70,31 @@ public class ToAllRecipientsDetectionAlertFilter extends StatefulDetectionAlertF
 
     final long minId = getMinId(highWaterMark);
 
-    Set<MergedAnomalyResultDTO> anomalies = this.filter(this.makeVectorClocks(this.detectionConfigIds), minId);
+    to = cleanupRecipients(this.recipients.get(PROP_TO));
+    cc = cleanupRecipients(this.recipients.get(PROP_CC));
+    bcc = cleanupRecipients(this.recipients.get(PROP_BCC));
+
+    // Early termination if there are no recipients in the "to" field
+    if (to.isEmpty()) {
+      return result;
+    }
 
-    to = (this.recipients.get(PROP_TO) == null) ? Collections.emptySet() : new HashSet<>(this.recipients.get(PROP_TO));
-    cc = (this.recipients.get(PROP_CC) == null) ? Collections.emptySet() : new HashSet<>(this.recipients.get(PROP_CC));
-    bcc = (this.recipients.get(PROP_BCC) == null) ? Collections.emptySet() : new HashSet<>(this.recipients.get(PROP_BCC));
+    // Fetch all the anomalies to be notified to the recipients
+    Set<MergedAnomalyResultDTO> anomalies = this.filter(this.makeVectorClocks(this.detectionConfigIds), minId);
 
     return result.addMapping(new DetectionAlertFilterRecipients(to, cc, bcc), anomalies);
   }
 
+  private Set<String> cleanupRecipients(Set<String> recipient) {
+    Set<String> filteredRecipients = new HashSet<>();
+    if (recipient != null) {
+      filteredRecipients.addAll(recipient);
+      filteredRecipients = filteredRecipients.stream().map(String::trim).collect(Collectors.toSet());
+      filteredRecipients.removeIf(rec -> rec == null || "".equals(rec));
+    }
+    return filteredRecipients;
+  }
+
   private long getMinId(long highWaterMark) {
     if (this.sendOnce) {
       return highWaterMark + 1;
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
index 138d054..d3570c3 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
@@ -20,6 +20,7 @@ import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
 import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
 import org.apache.pinot.thirdeye.detection.MockDataProvider;
 import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilter;
 import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
@@ -45,6 +46,7 @@ public class ToAllRecipientsDetectionAlertFilterTest {
   private static final String PROP_TO = "to";
   private static final String PROP_CC = "cc";
   private static final String PROP_BCC = "bcc";
+  private static final Set<String> PROP_EMPTY_TO_VALUE = new HashSet<>();
   private static final Set<String> PROP_TO_VALUE = new HashSet<>(Arrays.asList("test@test.com", "test@test.org"));
   private static final Set<String> PROP_CC_VALUE = new HashSet<>(Arrays.asList("cctest@test.com", "cctest@test.org"));
   private static final Set<String> PROP_BCC_VALUE = new HashSet<>(Arrays.asList("bcctest@test.com", "bcctest@test.org"));
@@ -60,7 +62,6 @@ public class ToAllRecipientsDetectionAlertFilterTest {
 
   private MockDataProvider provider;
   private DetectionAlertConfigDTO alertConfig;
-  private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
 
   @BeforeMethod
   public void beforeMethod() {
@@ -81,7 +82,6 @@ public class ToAllRecipientsDetectionAlertFilterTest {
         .setAnomalies(this.detectedAnomalies);
 
     this.alertConfig = createDetectionAlertConfig();
-    this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
   }
 
   private DetectionAlertConfigDTO createDetectionAlertConfig() {
@@ -197,4 +197,15 @@ public class ToAllRecipientsDetectionAlertFilterTest {
     Assert.assertTrue(result.getResult().get(RECIPIENTS).contains(existingNew));
     Assert.assertTrue(result.getResult().get(RECIPIENTS).contains(existingFuture));
   }
+
+  @Test
+  public void testGetAlertFilterResultWhenNoRecipient() throws Exception {
+    Map<String, Object> properties = ConfigUtils.getMap(this.alertConfig.getProperties().get(PROP_RECIPIENTS));
+    properties.put(PROP_TO, PROP_EMPTY_TO_VALUE);
+    this.alertConfig.setProperties(properties);
+    this.alertFilter = new ToAllRecipientsDetectionAlertFilter(this.provider, this.alertConfig,2500L);
+
+    DetectionAlertFilterResult result = this.alertFilter.run();
+    Assert.assertEquals(result.getResult().size(), 0);
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org