You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2018/11/16 00:08:17 UTC

[incubator-pinot] branch master updated: [TE] site wide impact filter - config global metric from yaml (#3490)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6711f67  [TE] site wide impact filter - config global metric from yaml  (#3490)
6711f67 is described below

commit 6711f6769de9baf53552b0829a6c680ed9eedad7
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Thu Nov 15 16:08:12 2018 -0800

    [TE] site wide impact filter - config global metric from yaml  (#3490)
    
    add the capability of configuring global metric from YAML.
---
 .../detection/DefaultInputDataFetcher.java         | 18 ++++-
 .../SitewideImpactRuleAnomalyFilter.java           | 87 ++++++++++++++++++----
 .../spec/SitewideImpactRuleAnomalyFilterSpec.java  | 52 ++++++++++---
 .../thirdeye/detection/spi/components/Tunable.java |  3 +-
 .../thirdeye/detection/spi/model/InputData.java    | 21 +++++-
 .../detection/spi/model/InputDataSpec.java         | 48 ++++++++++--
 .../yaml/CompositePipelineConfigTranslator.java    |  3 +-
 .../SitewideImpactRuleAnomalyFilterTest.java       | 24 +++++-
 8 files changed, 213 insertions(+), 43 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultInputDataFetcher.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultInputDataFetcher.java
index 2d4bb3d..0ecb502 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultInputDataFetcher.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultInputDataFetcher.java
@@ -27,6 +27,7 @@ import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.spi.model.EventSlice;
 import com.linkedin.thirdeye.detection.spi.model.InputData;
 import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -56,12 +57,21 @@ public class DefaultInputDataFetcher implements InputDataFetcher {
     Map<Long, MetricConfigDTO> metrics = provider.fetchMetrics(inputDataSpec.getMetricIds());
     Map<String, DatasetConfigDTO> datasets = provider.fetchDatasets(inputDataSpec.getDatasetNames());
 
-    Map<Long, DatasetConfigDTO> datasetForMetricId = fetchDatasetForMetricId(provider, inputDataSpec);
-    return new InputData(inputDataSpec, timeseries, aggregates, existingAnomalies, events, metrics, datasets, datasetForMetricId);
+    Map<Long, DatasetConfigDTO> datasetForMetricId = fetchDatasetForMetricId(inputDataSpec.getMetricIdsForDatasets());
+    Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> metricForMetricAndDatasetName = fetchMetricForDatasetAndMetricNames(inputDataSpec.getMetricAndDatasetNames());
+    return new InputData(inputDataSpec, timeseries, aggregates, existingAnomalies, events, metrics, datasets, datasetForMetricId, metricForMetricAndDatasetName);
   }
 
-  private Map<Long, DatasetConfigDTO> fetchDatasetForMetricId(DataProvider provider, InputDataSpec inputDataSpec) {
-    Map<Long, MetricConfigDTO> metrics = provider.fetchMetrics(inputDataSpec.getMetricIdsForDatasets());
+  private Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> fetchMetricForDatasetAndMetricNames(Collection<InputDataSpec.MetricAndDatasetName> metricNameAndDatasetNames){
+    Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> result = new HashMap<>();
+    for (InputDataSpec.MetricAndDatasetName pair : metricNameAndDatasetNames) {
+      result.put(pair, this.provider.fetchMetric(pair.getMetricName(), pair.getDatasetName()));
+    }
+    return result;
+  }
+
+  private Map<Long, DatasetConfigDTO> fetchDatasetForMetricId(Collection<Long> metricIdsForDatasets) {
+    Map<Long, MetricConfigDTO> metrics = provider.fetchMetrics(metricIdsForDatasets);
     Map<Long, String> metricIdToDataSet = new HashMap<>();
     for (Map.Entry<Long, MetricConfigDTO> entry : metrics.entrySet()){
       metricIdToDataSet.put(entry.getKey(), entry.getValue().getDataset());
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java
index fa95b44..5f85d18 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java
@@ -2,51 +2,86 @@ package com.linkedin.thirdeye.detection.components;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import com.linkedin.thirdeye.dashboard.resources.v2.BaselineParsingUtils;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.Pattern;
 import com.linkedin.thirdeye.detection.annotation.Components;
+import com.linkedin.thirdeye.detection.annotation.DetectionTag;
 import com.linkedin.thirdeye.detection.spec.SitewideImpactRuleAnomalyFilterSpec;
 import com.linkedin.thirdeye.detection.spi.components.AnomalyFilter;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
 import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.rootcause.timeseries.Baseline;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
 
+
 /**
  * Site-wide impact anomaly filter
  */
-@Components(type = "SITEWIDE_IMPACT_FILTER")
+@Components(type = "SITEWIDE_IMPACT_FILTER", tags = {DetectionTag.RULE_FILTER})
 public class SitewideImpactRuleAnomalyFilter implements AnomalyFilter<SitewideImpactRuleAnomalyFilterSpec> {
   private double threshold;
   private InputDataFetcher dataFetcher;
   private Baseline baseline;
   private String siteWideMetricUrn;
+  private Pattern pattern;
 
   @Override
   public boolean isQualified(MergedAnomalyResultDTO anomaly) {
     MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
-    MetricSlice currentSlice =
-        MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
-    MetricSlice baselineSlice = this.baseline.scatter(currentSlice).get(0);
+    List<MetricSlice> slices = new ArrayList<>();
+    MetricSlice currentSlice = MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
+    slices.add(currentSlice);
+
+    // customize baseline offset
+    MetricSlice baselineSlice = null;
+    if (baseline != null) {
+      baselineSlice = this.baseline.scatter(currentSlice).get(0);
+      slices.add(baselineSlice);
+    }
 
-    String siteWideImpactMetricUrn = Strings.isNullOrEmpty(this.siteWideMetricUrn) ? anomaly.getMetricUrn() : this.siteWideMetricUrn;
-    MetricEntity siteWideEntity = MetricEntity.fromURN(siteWideImpactMetricUrn).withFilters(ArrayListMultimap.<String, String>create());
-    MetricSlice siteWideSlice = this.baseline.scatter(MetricSlice.from(siteWideEntity.getId(), anomaly.getStartTime(), anomaly.getEndTime())).get(0);
+    MetricSlice siteWideSlice;
+    if (Strings.isNullOrEmpty(this.siteWideMetricUrn)) {
+      // if global metric is not set
+      MetricEntity siteWideEntity = MetricEntity.fromURN(anomaly.getMetricUrn());
+      siteWideSlice = MetricSlice.from(siteWideEntity.getId(), anomaly.getStartTime(), anomaly.getEndTime());
+    } else {
+      MetricEntity siteWideEntity = MetricEntity.fromURN(this.siteWideMetricUrn);
+      siteWideSlice = MetricSlice.from(siteWideEntity.getId(), anomaly.getStartTime(), anomaly.getEndTime(),
+          siteWideEntity.getFilters());
+    }
+    slices.add(siteWideSlice);
 
-    Map<MetricSlice, DataFrame> aggregates =
-        this.dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(Arrays.asList(currentSlice, baselineSlice, siteWideSlice))).getAggregates();
+
+
+    Map<MetricSlice, DataFrame> aggregates = this.dataFetcher.fetchData(
+        new InputDataSpec().withAggregateSlices(slices))
+        .getAggregates();
 
     double currentValue = getValueFromAggregates(currentSlice, aggregates);
-    double baselineValue = getValueFromAggregates(baselineSlice, aggregates);
+    double baselineValue = baselineSlice == null ? anomaly.getAvgBaselineVal() : getValueFromAggregates(baselineSlice, aggregates);
     double siteWideBaselineValue = getValueFromAggregates(siteWideSlice, aggregates);
 
-    if (siteWideBaselineValue != 0 && (Math.abs(currentValue - baselineValue) / siteWideBaselineValue) < this.threshold) {
+    // if inconsistent with up/down, filter the anomaly
+    if ((currentValue < baselineValue && pattern.equals(Pattern.UP)) || (currentValue > baselineValue && pattern.equals(
+        Pattern.DOWN))) {
+      return false;
+    }
+    // if doesn't pass the threshold, filter the anomaly
+    if (siteWideBaselineValue != 0
+        && (Math.abs(currentValue - baselineValue) / siteWideBaselineValue) < this.threshold) {
       return false;
     }
 
@@ -56,9 +91,33 @@ public class SitewideImpactRuleAnomalyFilter implements AnomalyFilter<SitewideIm
   @Override
   public void init(SitewideImpactRuleAnomalyFilterSpec spec, InputDataFetcher dataFetcher) {
     this.dataFetcher = dataFetcher;
-    this.baseline = BaselineParsingUtils.parseOffset(spec.getOffset(), spec.getTimezone());
     this.threshold = spec.getThreshold();
-    this.siteWideMetricUrn = spec.getSitewideMetricUrn();
+    this.pattern = Pattern.valueOf(spec.getPattern().toUpperCase());
+
+    // customize baseline offset
+    if (!Strings.isNullOrEmpty(spec.getOffset())){
+      this.baseline = BaselineParsingUtils.parseOffset(spec.getOffset(), spec.getTimezone());
+    }
+
+    if (!Strings.isNullOrEmpty(spec.getSitewideCollection()) && !Strings.isNullOrEmpty(spec.getSitewideMetricName())) {
+      // build filters
+      Map<String, Collection<String>> filterMaps = spec.getFilters();
+      Multimap<String, String> filters = ArrayListMultimap.create();
+      if (filterMaps != null) {
+        for (Map.Entry<String, Collection<String>> entry : filterMaps.entrySet()) {
+          filters.putAll(entry.getKey(), entry.getValue());
+        }
+      }
+
+      // build site wide metric Urn
+      InputDataSpec.MetricAndDatasetName metricAndDatasetName =
+          new InputDataSpec.MetricAndDatasetName(spec.getSitewideMetricName(), spec.getSitewideCollection());
+      InputData data = this.dataFetcher.fetchData(
+          new InputDataSpec().withMetricNamesAndDatasetNames(Collections.singletonList(metricAndDatasetName)));
+      MetricConfigDTO metricConfigDTO = data.getMetricForMetricAndDatasetNames().get(metricAndDatasetName);
+      MetricEntity me = MetricEntity.fromMetric(1.0, metricConfigDTO.getId(), filters);
+      this.siteWideMetricUrn = me.getUrn();
+    }
   }
 
   private double getValueFromAggregates(MetricSlice slice, Map<MetricSlice, DataFrame> aggregates) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/SitewideImpactRuleAnomalyFilterSpec.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/SitewideImpactRuleAnomalyFilterSpec.java
index 7da2799..c0090fa 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/SitewideImpactRuleAnomalyFilterSpec.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/SitewideImpactRuleAnomalyFilterSpec.java
@@ -1,10 +1,50 @@
 package com.linkedin.thirdeye.detection.spec;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
 public class SitewideImpactRuleAnomalyFilterSpec extends AbstractSpec {
   private String timezone = "UTC";
-  private String sitewideMetricUrn;
   private double threshold = Double.NaN;
-  private String offset = "wo1w";
+  private String offset;
+  private String pattern;
+  private String sitewideMetricName;
+  private String sitewideCollection;
+  private Map<String, Collection<String>> filters = new HashMap<>();
+
+  public String getSitewideMetricName() {
+    return sitewideMetricName;
+  }
+
+  public void setSitewideMetricName(String sitewideMetricName) {
+    this.sitewideMetricName = sitewideMetricName;
+  }
+
+  public String getSitewideCollection() {
+    return sitewideCollection;
+  }
+
+  public void setSitewideCollection(String sitewideCollection) {
+    this.sitewideCollection = sitewideCollection;
+  }
+
+  public Map<String, Collection<String>> getFilters() {
+    return filters;
+  }
+
+  public void setFilters(Map<String, Collection<String>> filters) {
+    this.filters = filters;
+  }
+
+  public String getPattern() {
+    return pattern;
+  }
+
+  public void setPattern(String pattern) {
+    this.pattern = pattern;
+  }
 
   public String getOffset() {
     return offset;
@@ -22,14 +62,6 @@ public class SitewideImpactRuleAnomalyFilterSpec extends AbstractSpec {
     this.timezone = timezone;
   }
 
-  public String getSitewideMetricUrn() {
-    return sitewideMetricUrn;
-  }
-
-  public void setSitewideMetricUrn(String sitewideMetricUrn) {
-    this.sitewideMetricUrn = sitewideMetricUrn;
-  }
-
   public double getThreshold() {
     return threshold;
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
index 780cb2f..e797fb3 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
@@ -23,7 +23,8 @@ import java.util.Map;
 import org.joda.time.Interval;
 
 /**
- * The tunable. For tuning specs of each component.
+ * The tunable. For tuning specs of each component. Will be initialize with user's input yaml for this
+ * component.
  */
 public interface Tunable<T extends AbstractSpec> extends BaseComponent<T> {
   /**
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java
index adb51cc..b07c99d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java
@@ -23,6 +23,7 @@ import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.EventDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
@@ -38,8 +39,19 @@ public class InputData {
   final Multimap<EventSlice, EventDTO> events;
   final Map<Long, MetricConfigDTO> metrics;
   final Map<String, DatasetConfigDTO> datasets;
+
+  /**
+   * The data set config dtos for metric ids
+   * @see InputDataSpec#withMetricIdsForDataset(Collection)
+   */
   final Map<Long, DatasetConfigDTO> datasetForMetricId;
 
+  /**
+   * The metric config dtos for metric and data set names
+   * @see InputDataSpec#withMetricNamesAndDatasetNames(Collection)
+   */
+  final Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> metricForMetricAndDatasetNames;
+
   public InputData(InputDataSpec spec, Map<MetricSlice, DataFrame> timeseries, Map<MetricSlice, DataFrame> aggregates,
       Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies, Multimap<EventSlice, EventDTO> events) {
     this.dataSpec = spec;
@@ -50,11 +62,13 @@ public class InputData {
     this.metrics = Collections.emptyMap();
     this.datasets = Collections.emptyMap();
     this.datasetForMetricId = Collections.emptyMap();
+    this.metricForMetricAndDatasetNames = Collections.emptyMap();
   }
 
   public InputData(InputDataSpec spec, Map<MetricSlice, DataFrame> timeseries, Map<MetricSlice, DataFrame> aggregates,
       Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies, Multimap<EventSlice, EventDTO> events,
-      Map<Long, MetricConfigDTO> metrics, Map<String, DatasetConfigDTO> datasets, Map<Long, DatasetConfigDTO> datasetForMetricId) {
+      Map<Long, MetricConfigDTO> metrics, Map<String, DatasetConfigDTO> datasets, Map<Long, DatasetConfigDTO> datasetForMetricId,
+      Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> metricForMetricAndDatasetNames) {
     this.dataSpec = spec;
     this.timeseries = timeseries;
     this.aggregates = aggregates;
@@ -63,6 +77,7 @@ public class InputData {
     this.metrics = metrics;
     this.datasets = datasets;
     this.datasetForMetricId = datasetForMetricId;
+    this.metricForMetricAndDatasetNames = metricForMetricAndDatasetNames;
   }
 
   public InputDataSpec getDataSpec() {
@@ -96,4 +111,8 @@ public class InputData {
   public Map<Long, DatasetConfigDTO> getDatasetForMetricId(){
     return datasetForMetricId;
   }
+
+  public Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> getMetricForMetricAndDatasetNames() {
+    return metricForMetricAndDatasetNames;
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java
index 9bd4304..18d8ae3 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java
@@ -64,6 +64,29 @@ public class InputDataSpec {
   */
   final Collection<Long> metricIdsForDatasets;
 
+  /*
+    Metric name and data set name to fetch the MetricConfigDTO for.
+  */
+  final Collection<MetricAndDatasetName> metricAndDatasetNames;
+
+  public static class MetricAndDatasetName {
+    private final String metricName;
+    private final String datasetName;
+
+    public MetricAndDatasetName(String metricName, String datasetName) {
+      this.metricName = metricName;
+      this.datasetName = datasetName;
+    }
+
+    public String getMetricName() {
+      return metricName;
+    }
+
+    public String getDatasetName() {
+      return datasetName;
+    }
+  }
+
   public InputDataSpec() {
     this.timeseriesSlices = Collections.emptyList();
     this.aggregateSlices = Collections.emptyList();
@@ -72,11 +95,12 @@ public class InputDataSpec {
     this.metricIds = Collections.emptyList();
     this.datasetNames = Collections.emptyList();
     this.metricIdsForDatasets = Collections.emptyList();
+    this.metricAndDatasetNames = Collections.emptyList();
   }
 
   public InputDataSpec(Collection<MetricSlice> timeseriesSlices, Collection<MetricSlice> aggregateSlices,
       Collection<AnomalySlice> anomalySlices, Collection<EventSlice> eventSlices, Collection<Long> metricIds, Collection<String> datasetNames,
-      Collection<Long> metricIdsForDatasets) {
+      Collection<Long> metricIdsForDatasets, Collection<MetricAndDatasetName> metricAndDatasetNames) {
     this.timeseriesSlices = timeseriesSlices;
     this.aggregateSlices = aggregateSlices;
     this.anomalySlices = anomalySlices;
@@ -84,6 +108,7 @@ public class InputDataSpec {
     this.metricIds = metricIds;
     this.datasetNames = datasetNames;
     this.metricIdsForDatasets = metricIdsForDatasets;
+    this.metricAndDatasetNames = metricAndDatasetNames;
   }
 
   public Collection<MetricSlice> getTimeseriesSlices() {
@@ -114,32 +139,39 @@ public class InputDataSpec {
     return metricIdsForDatasets;
   }
 
+  public Collection<MetricAndDatasetName> getMetricAndDatasetNames() {
+    return metricAndDatasetNames;
+  }
+
   public InputDataSpec withTimeseriesSlices(Collection<MetricSlice> timeseriesSlices) {
-    return new InputDataSpec(timeseriesSlices, this.aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(timeseriesSlices, this.aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withAggregateSlices(Collection<MetricSlice> aggregateSlices) {
-    return new InputDataSpec(this.timeseriesSlices, aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withAnomalySlices(Collection<AnomalySlice> anomalySlices) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withEventSlices(Collection<EventSlice> eventSlices) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withMetricIds(Collection<Long> metricIds) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withDatasetNames(Collection<String> datasetNames) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withMetricIdsForDataset(Collection<Long> metricIdsForDatasets) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, metricIdsForDatasets, this.metricAndDatasetNames);
+  }
 
+  public InputDataSpec withMetricNamesAndDatasetNames(Collection<MetricAndDatasetName> metricNameAndDatasetNames) {
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, metricNameAndDatasetNames);
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 8834199..4094ba9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -133,6 +133,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
   private final Map<String, Object> components = new HashMap<>();
   private MetricConfigDTO metricConfig;
   private DatasetConfigDTO datasetConfig;
+  private String metricUrn;
 
   public CompositePipelineConfigTranslator(Map<String, Object> yamlConfig, DataProvider provider) {
     super(yamlConfig, provider);
@@ -148,7 +149,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
         .get(metricConfig.getDataset());
     Preconditions.checkNotNull(this.datasetConfig, "dataset not found");
 
-    String metricUrn = buildMetricUrn(yamlConfig);
+    this.metricUrn = buildMetricUrn(yamlConfig);
     String cron = buildCron();
 
     List<Map<String, Object>> ruleYamls = getList(yamlConfig.get(PROP_RULES));
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java
index fb9f57a..cbe1b5b 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java
@@ -43,7 +43,6 @@ import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
 public class SitewideImpactRuleAnomalyFilterTest {
   private static final String METRIC_URN = "thirdeye:metric:123";
 
-  private List<MergedAnomalyResultDTO> anomalies;
   private DataProvider testDataProvider;
   private Baseline baseline;
 
@@ -62,8 +61,6 @@ public class SitewideImpactRuleAnomalyFilterTest {
     aggregates.put(slice2, new DataFrame().addSeries(COL_VALUE, 500));
     aggregates.put(baselineSlice2, new DataFrame().addSeries(COL_VALUE, 1000));
 
-    this.anomalies = Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6));
-
     this.testDataProvider = new MockDataProvider().setAggregates(aggregates);
   }
 
@@ -72,11 +69,12 @@ public class SitewideImpactRuleAnomalyFilterTest {
     SitewideImpactRuleAnomalyFilterSpec spec = new SitewideImpactRuleAnomalyFilterSpec();
     spec.setThreshold(0.5);
     spec.setOffset("median3w");
+    spec.setPattern("down");
     SitewideImpactRuleAnomalyFilter filter = new SitewideImpactRuleAnomalyFilter();
     filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, 125L));
 
     List<Boolean> results =
-        this.anomalies.stream().map(anomaly -> filter.isQualified(anomaly)).collect(Collectors.toList());
+        Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6)).stream().map(anomaly -> filter.isQualified(anomaly)).collect(Collectors.toList());
     Assert.assertEquals(results, Arrays.asList(false, true));
   }
 
@@ -86,4 +84,22 @@ public class SitewideImpactRuleAnomalyFilterTest {
     anomaly.setMetricUrn(METRIC_URN);
     return anomaly;
   }
+
+  @Test
+  public void testSiteWideImpactFilterNoOffset() {
+    SitewideImpactRuleAnomalyFilterSpec spec = new SitewideImpactRuleAnomalyFilterSpec();
+    spec.setThreshold(0.5);
+    spec.setPattern("down");
+    SitewideImpactRuleAnomalyFilter filter = new SitewideImpactRuleAnomalyFilter();
+    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, 125L));
+    List<MergedAnomalyResultDTO> anomalyResultDTOs = Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6));
+    anomalyResultDTOs.get(0).setAvgCurrentVal(150);
+    anomalyResultDTOs.get(0).setAvgBaselineVal(200);
+    anomalyResultDTOs.get(1).setAvgCurrentVal(500);
+    anomalyResultDTOs.get(1).setAvgBaselineVal(1000);
+
+    List<Boolean> results =
+        anomalyResultDTOs.stream().map(anomaly -> filter.isQualified(anomaly)).collect(Collectors.toList());
+    Assert.assertEquals(results, Arrays.asList(false, true));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org