You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pinot.apache.org by GitBox <gi...@apache.org> on 2018/11/16 00:08:14 UTC

[GitHub] apucher closed pull request #3490: [TE] site wide impact filter - config global metric from yaml

apucher closed pull request #3490: [TE] site wide impact filter - config global metric from yaml 
URL: https://github.com/apache/incubator-pinot/pull/3490
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultInputDataFetcher.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultInputDataFetcher.java
index 2d4bb3d179..0ecb502149 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultInputDataFetcher.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultInputDataFetcher.java
@@ -27,6 +27,7 @@
 import com.linkedin.thirdeye.detection.spi.model.EventSlice;
 import com.linkedin.thirdeye.detection.spi.model.InputData;
 import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -56,12 +57,21 @@ public InputData fetchData(InputDataSpec inputDataSpec) {
     Map<Long, MetricConfigDTO> metrics = provider.fetchMetrics(inputDataSpec.getMetricIds());
     Map<String, DatasetConfigDTO> datasets = provider.fetchDatasets(inputDataSpec.getDatasetNames());
 
-    Map<Long, DatasetConfigDTO> datasetForMetricId = fetchDatasetForMetricId(provider, inputDataSpec);
-    return new InputData(inputDataSpec, timeseries, aggregates, existingAnomalies, events, metrics, datasets, datasetForMetricId);
+    Map<Long, DatasetConfigDTO> datasetForMetricId = fetchDatasetForMetricId(inputDataSpec.getMetricIdsForDatasets());
+    Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> metricForMetricAndDatasetName = fetchMetricForDatasetAndMetricNames(inputDataSpec.getMetricAndDatasetNames());
+    return new InputData(inputDataSpec, timeseries, aggregates, existingAnomalies, events, metrics, datasets, datasetForMetricId, metricForMetricAndDatasetName);
   }
 
-  private Map<Long, DatasetConfigDTO> fetchDatasetForMetricId(DataProvider provider, InputDataSpec inputDataSpec) {
-    Map<Long, MetricConfigDTO> metrics = provider.fetchMetrics(inputDataSpec.getMetricIdsForDatasets());
+  private Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> fetchMetricForDatasetAndMetricNames(Collection<InputDataSpec.MetricAndDatasetName> metricNameAndDatasetNames){
+    Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> result = new HashMap<>();
+    for (InputDataSpec.MetricAndDatasetName pair : metricNameAndDatasetNames) {
+      result.put(pair, this.provider.fetchMetric(pair.getMetricName(), pair.getDatasetName()));
+    }
+    return result;
+  }
+
+  private Map<Long, DatasetConfigDTO> fetchDatasetForMetricId(Collection<Long> metricIdsForDatasets) {
+    Map<Long, MetricConfigDTO> metrics = provider.fetchMetrics(metricIdsForDatasets);
     Map<Long, String> metricIdToDataSet = new HashMap<>();
     for (Map.Entry<Long, MetricConfigDTO> entry : metrics.entrySet()){
       metricIdToDataSet.put(entry.getKey(), entry.getValue().getDataset());
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java
index fa95b447d9..5f85d18e67 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java
@@ -2,51 +2,86 @@
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import com.linkedin.thirdeye.dashboard.resources.v2.BaselineParsingUtils;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.InputDataFetcher;
+import com.linkedin.thirdeye.detection.Pattern;
 import com.linkedin.thirdeye.detection.annotation.Components;
+import com.linkedin.thirdeye.detection.annotation.DetectionTag;
 import com.linkedin.thirdeye.detection.spec.SitewideImpactRuleAnomalyFilterSpec;
 import com.linkedin.thirdeye.detection.spi.components.AnomalyFilter;
+import com.linkedin.thirdeye.detection.spi.model.InputData;
 import com.linkedin.thirdeye.detection.spi.model.InputDataSpec;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.rootcause.timeseries.Baseline;
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import static com.linkedin.thirdeye.dataframe.util.DataFrameUtils.*;
 
+
 /**
  * Site-wide impact anomaly filter
  */
-@Components(type = "SITEWIDE_IMPACT_FILTER")
+@Components(type = "SITEWIDE_IMPACT_FILTER", tags = {DetectionTag.RULE_FILTER})
 public class SitewideImpactRuleAnomalyFilter implements AnomalyFilter<SitewideImpactRuleAnomalyFilterSpec> {
   private double threshold;
   private InputDataFetcher dataFetcher;
   private Baseline baseline;
   private String siteWideMetricUrn;
+  private Pattern pattern;
 
   @Override
   public boolean isQualified(MergedAnomalyResultDTO anomaly) {
     MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
-    MetricSlice currentSlice =
-        MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
-    MetricSlice baselineSlice = this.baseline.scatter(currentSlice).get(0);
+    List<MetricSlice> slices = new ArrayList<>();
+    MetricSlice currentSlice = MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
+    slices.add(currentSlice);
+
+    // customize baseline offset
+    MetricSlice baselineSlice = null;
+    if (baseline != null) {
+      baselineSlice = this.baseline.scatter(currentSlice).get(0);
+      slices.add(baselineSlice);
+    }
 
-    String siteWideImpactMetricUrn = Strings.isNullOrEmpty(this.siteWideMetricUrn) ? anomaly.getMetricUrn() : this.siteWideMetricUrn;
-    MetricEntity siteWideEntity = MetricEntity.fromURN(siteWideImpactMetricUrn).withFilters(ArrayListMultimap.<String, String>create());
-    MetricSlice siteWideSlice = this.baseline.scatter(MetricSlice.from(siteWideEntity.getId(), anomaly.getStartTime(), anomaly.getEndTime())).get(0);
+    MetricSlice siteWideSlice;
+    if (Strings.isNullOrEmpty(this.siteWideMetricUrn)) {
+      // if global metric is not set
+      MetricEntity siteWideEntity = MetricEntity.fromURN(anomaly.getMetricUrn());
+      siteWideSlice = MetricSlice.from(siteWideEntity.getId(), anomaly.getStartTime(), anomaly.getEndTime());
+    } else {
+      MetricEntity siteWideEntity = MetricEntity.fromURN(this.siteWideMetricUrn);
+      siteWideSlice = MetricSlice.from(siteWideEntity.getId(), anomaly.getStartTime(), anomaly.getEndTime(),
+          siteWideEntity.getFilters());
+    }
+    slices.add(siteWideSlice);
 
-    Map<MetricSlice, DataFrame> aggregates =
-        this.dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(Arrays.asList(currentSlice, baselineSlice, siteWideSlice))).getAggregates();
+
+
+    Map<MetricSlice, DataFrame> aggregates = this.dataFetcher.fetchData(
+        new InputDataSpec().withAggregateSlices(slices))
+        .getAggregates();
 
     double currentValue = getValueFromAggregates(currentSlice, aggregates);
-    double baselineValue = getValueFromAggregates(baselineSlice, aggregates);
+    double baselineValue = baselineSlice == null ? anomaly.getAvgBaselineVal() : getValueFromAggregates(baselineSlice, aggregates);
     double siteWideBaselineValue = getValueFromAggregates(siteWideSlice, aggregates);
 
-    if (siteWideBaselineValue != 0 && (Math.abs(currentValue - baselineValue) / siteWideBaselineValue) < this.threshold) {
+    // if inconsistent with up/down, filter the anomaly
+    if ((currentValue < baselineValue && pattern.equals(Pattern.UP)) || (currentValue > baselineValue && pattern.equals(
+        Pattern.DOWN))) {
+      return false;
+    }
+    // if doesn't pass the threshold, filter the anomaly
+    if (siteWideBaselineValue != 0
+        && (Math.abs(currentValue - baselineValue) / siteWideBaselineValue) < this.threshold) {
       return false;
     }
 
@@ -56,9 +91,33 @@ public boolean isQualified(MergedAnomalyResultDTO anomaly) {
   @Override
   public void init(SitewideImpactRuleAnomalyFilterSpec spec, InputDataFetcher dataFetcher) {
     this.dataFetcher = dataFetcher;
-    this.baseline = BaselineParsingUtils.parseOffset(spec.getOffset(), spec.getTimezone());
     this.threshold = spec.getThreshold();
-    this.siteWideMetricUrn = spec.getSitewideMetricUrn();
+    this.pattern = Pattern.valueOf(spec.getPattern().toUpperCase());
+
+    // customize baseline offset
+    if (!Strings.isNullOrEmpty(spec.getOffset())){
+      this.baseline = BaselineParsingUtils.parseOffset(spec.getOffset(), spec.getTimezone());
+    }
+
+    if (!Strings.isNullOrEmpty(spec.getSitewideCollection()) && !Strings.isNullOrEmpty(spec.getSitewideMetricName())) {
+      // build filters
+      Map<String, Collection<String>> filterMaps = spec.getFilters();
+      Multimap<String, String> filters = ArrayListMultimap.create();
+      if (filterMaps != null) {
+        for (Map.Entry<String, Collection<String>> entry : filterMaps.entrySet()) {
+          filters.putAll(entry.getKey(), entry.getValue());
+        }
+      }
+
+      // build site wide metric Urn
+      InputDataSpec.MetricAndDatasetName metricAndDatasetName =
+          new InputDataSpec.MetricAndDatasetName(spec.getSitewideMetricName(), spec.getSitewideCollection());
+      InputData data = this.dataFetcher.fetchData(
+          new InputDataSpec().withMetricNamesAndDatasetNames(Collections.singletonList(metricAndDatasetName)));
+      MetricConfigDTO metricConfigDTO = data.getMetricForMetricAndDatasetNames().get(metricAndDatasetName);
+      MetricEntity me = MetricEntity.fromMetric(1.0, metricConfigDTO.getId(), filters);
+      this.siteWideMetricUrn = me.getUrn();
+    }
   }
 
   private double getValueFromAggregates(MetricSlice slice, Map<MetricSlice, DataFrame> aggregates) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/SitewideImpactRuleAnomalyFilterSpec.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/SitewideImpactRuleAnomalyFilterSpec.java
index 7da2799dea..c0090fa618 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/SitewideImpactRuleAnomalyFilterSpec.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spec/SitewideImpactRuleAnomalyFilterSpec.java
@@ -1,10 +1,50 @@
 package com.linkedin.thirdeye.detection.spec;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
 public class SitewideImpactRuleAnomalyFilterSpec extends AbstractSpec {
   private String timezone = "UTC";
-  private String sitewideMetricUrn;
   private double threshold = Double.NaN;
-  private String offset = "wo1w";
+  private String offset;
+  private String pattern;
+  private String sitewideMetricName;
+  private String sitewideCollection;
+  private Map<String, Collection<String>> filters = new HashMap<>();
+
+  public String getSitewideMetricName() {
+    return sitewideMetricName;
+  }
+
+  public void setSitewideMetricName(String sitewideMetricName) {
+    this.sitewideMetricName = sitewideMetricName;
+  }
+
+  public String getSitewideCollection() {
+    return sitewideCollection;
+  }
+
+  public void setSitewideCollection(String sitewideCollection) {
+    this.sitewideCollection = sitewideCollection;
+  }
+
+  public Map<String, Collection<String>> getFilters() {
+    return filters;
+  }
+
+  public void setFilters(Map<String, Collection<String>> filters) {
+    this.filters = filters;
+  }
+
+  public String getPattern() {
+    return pattern;
+  }
+
+  public void setPattern(String pattern) {
+    this.pattern = pattern;
+  }
 
   public String getOffset() {
     return offset;
@@ -22,14 +62,6 @@ public void setTimezone(String timezone) {
     this.timezone = timezone;
   }
 
-  public String getSitewideMetricUrn() {
-    return sitewideMetricUrn;
-  }
-
-  public void setSitewideMetricUrn(String sitewideMetricUrn) {
-    this.sitewideMetricUrn = sitewideMetricUrn;
-  }
-
   public double getThreshold() {
     return threshold;
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
index 780cb2f339..e797fb3124 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/components/Tunable.java
@@ -23,7 +23,8 @@
 import org.joda.time.Interval;
 
 /**
- * The tunable. For tuning specs of each component.
+ * The tunable. For tuning specs of each component. Will be initialize with user's input yaml for this
+ * component.
  */
 public interface Tunable<T extends AbstractSpec> extends BaseComponent<T> {
   /**
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java
index adb51cc236..b07c99de35 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputData.java
@@ -23,6 +23,7 @@
 import com.linkedin.thirdeye.datalayer.dto.EventDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
@@ -38,8 +39,19 @@
   final Multimap<EventSlice, EventDTO> events;
   final Map<Long, MetricConfigDTO> metrics;
   final Map<String, DatasetConfigDTO> datasets;
+
+  /**
+   * The data set config dtos for metric ids
+   * @see InputDataSpec#withMetricIdsForDataset(Collection)
+   */
   final Map<Long, DatasetConfigDTO> datasetForMetricId;
 
+  /**
+   * The metric config dtos for metric and data set names
+   * @see InputDataSpec#withMetricNamesAndDatasetNames(Collection)
+   */
+  final Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> metricForMetricAndDatasetNames;
+
   public InputData(InputDataSpec spec, Map<MetricSlice, DataFrame> timeseries, Map<MetricSlice, DataFrame> aggregates,
       Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies, Multimap<EventSlice, EventDTO> events) {
     this.dataSpec = spec;
@@ -50,11 +62,13 @@ public InputData(InputDataSpec spec, Map<MetricSlice, DataFrame> timeseries, Map
     this.metrics = Collections.emptyMap();
     this.datasets = Collections.emptyMap();
     this.datasetForMetricId = Collections.emptyMap();
+    this.metricForMetricAndDatasetNames = Collections.emptyMap();
   }
 
   public InputData(InputDataSpec spec, Map<MetricSlice, DataFrame> timeseries, Map<MetricSlice, DataFrame> aggregates,
       Multimap<AnomalySlice, MergedAnomalyResultDTO> anomalies, Multimap<EventSlice, EventDTO> events,
-      Map<Long, MetricConfigDTO> metrics, Map<String, DatasetConfigDTO> datasets, Map<Long, DatasetConfigDTO> datasetForMetricId) {
+      Map<Long, MetricConfigDTO> metrics, Map<String, DatasetConfigDTO> datasets, Map<Long, DatasetConfigDTO> datasetForMetricId,
+      Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> metricForMetricAndDatasetNames) {
     this.dataSpec = spec;
     this.timeseries = timeseries;
     this.aggregates = aggregates;
@@ -63,6 +77,7 @@ public InputData(InputDataSpec spec, Map<MetricSlice, DataFrame> timeseries, Map
     this.metrics = metrics;
     this.datasets = datasets;
     this.datasetForMetricId = datasetForMetricId;
+    this.metricForMetricAndDatasetNames = metricForMetricAndDatasetNames;
   }
 
   public InputDataSpec getDataSpec() {
@@ -96,4 +111,8 @@ public InputDataSpec getDataSpec() {
   public Map<Long, DatasetConfigDTO> getDatasetForMetricId(){
     return datasetForMetricId;
   }
+
+  public Map<InputDataSpec.MetricAndDatasetName, MetricConfigDTO> getMetricForMetricAndDatasetNames() {
+    return metricForMetricAndDatasetNames;
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java
index 9bd4304335..18d8ae3aba 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/spi/model/InputDataSpec.java
@@ -64,6 +64,29 @@
   */
   final Collection<Long> metricIdsForDatasets;
 
+  /*
+    Metric name and data set name to fetch the MetricConfigDTO for.
+  */
+  final Collection<MetricAndDatasetName> metricAndDatasetNames;
+
+  public static class MetricAndDatasetName {
+    private final String metricName;
+    private final String datasetName;
+
+    public MetricAndDatasetName(String metricName, String datasetName) {
+      this.metricName = metricName;
+      this.datasetName = datasetName;
+    }
+
+    public String getMetricName() {
+      return metricName;
+    }
+
+    public String getDatasetName() {
+      return datasetName;
+    }
+  }
+
   public InputDataSpec() {
     this.timeseriesSlices = Collections.emptyList();
     this.aggregateSlices = Collections.emptyList();
@@ -72,11 +95,12 @@ public InputDataSpec() {
     this.metricIds = Collections.emptyList();
     this.datasetNames = Collections.emptyList();
     this.metricIdsForDatasets = Collections.emptyList();
+    this.metricAndDatasetNames = Collections.emptyList();
   }
 
   public InputDataSpec(Collection<MetricSlice> timeseriesSlices, Collection<MetricSlice> aggregateSlices,
       Collection<AnomalySlice> anomalySlices, Collection<EventSlice> eventSlices, Collection<Long> metricIds, Collection<String> datasetNames,
-      Collection<Long> metricIdsForDatasets) {
+      Collection<Long> metricIdsForDatasets, Collection<MetricAndDatasetName> metricAndDatasetNames) {
     this.timeseriesSlices = timeseriesSlices;
     this.aggregateSlices = aggregateSlices;
     this.anomalySlices = anomalySlices;
@@ -84,6 +108,7 @@ public InputDataSpec(Collection<MetricSlice> timeseriesSlices, Collection<Metric
     this.metricIds = metricIds;
     this.datasetNames = datasetNames;
     this.metricIdsForDatasets = metricIdsForDatasets;
+    this.metricAndDatasetNames = metricAndDatasetNames;
   }
 
   public Collection<MetricSlice> getTimeseriesSlices() {
@@ -114,32 +139,39 @@ public InputDataSpec(Collection<MetricSlice> timeseriesSlices, Collection<Metric
     return metricIdsForDatasets;
   }
 
+  public Collection<MetricAndDatasetName> getMetricAndDatasetNames() {
+    return metricAndDatasetNames;
+  }
+
   public InputDataSpec withTimeseriesSlices(Collection<MetricSlice> timeseriesSlices) {
-    return new InputDataSpec(timeseriesSlices, this.aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(timeseriesSlices, this.aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withAggregateSlices(Collection<MetricSlice> aggregateSlices) {
-    return new InputDataSpec(this.timeseriesSlices, aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, aggregateSlices, this.anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withAnomalySlices(Collection<AnomalySlice> anomalySlices) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, anomalySlices, this.eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withEventSlices(Collection<EventSlice> eventSlices) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withMetricIds(Collection<Long> metricIds) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, metricIds, this.datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, metricIds, this.datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withDatasetNames(Collection<String> datasetNames) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, datasetNames, this.metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, datasetNames, this.metricIdsForDatasets, this.metricAndDatasetNames);
   }
 
   public InputDataSpec withMetricIdsForDataset(Collection<Long> metricIdsForDatasets) {
-    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, metricIdsForDatasets);
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, metricIdsForDatasets, this.metricAndDatasetNames);
+  }
 
+  public InputDataSpec withMetricNamesAndDatasetNames(Collection<MetricAndDatasetName> metricNameAndDatasetNames) {
+    return new InputDataSpec(this.timeseriesSlices, this.aggregateSlices, this.anomalySlices, eventSlices, this.metricIds, this.datasetNames, this.metricIdsForDatasets, metricNameAndDatasetNames);
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 8834199d34..4094ba9b49 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -133,6 +133,7 @@
   private final Map<String, Object> components = new HashMap<>();
   private MetricConfigDTO metricConfig;
   private DatasetConfigDTO datasetConfig;
+  private String metricUrn;
 
   public CompositePipelineConfigTranslator(Map<String, Object> yamlConfig, DataProvider provider) {
     super(yamlConfig, provider);
@@ -148,7 +149,7 @@ YamlTranslationResult translateYaml() {
         .get(metricConfig.getDataset());
     Preconditions.checkNotNull(this.datasetConfig, "dataset not found");
 
-    String metricUrn = buildMetricUrn(yamlConfig);
+    this.metricUrn = buildMetricUrn(yamlConfig);
     String cron = buildCron();
 
     List<Map<String, Object>> ruleYamls = getList(yamlConfig.get(PROP_RULES));
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java
index fb9f57ae89..cbe1b5ba29 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java
@@ -43,7 +43,6 @@
 public class SitewideImpactRuleAnomalyFilterTest {
   private static final String METRIC_URN = "thirdeye:metric:123";
 
-  private List<MergedAnomalyResultDTO> anomalies;
   private DataProvider testDataProvider;
   private Baseline baseline;
 
@@ -62,8 +61,6 @@ public void beforeMethod() {
     aggregates.put(slice2, new DataFrame().addSeries(COL_VALUE, 500));
     aggregates.put(baselineSlice2, new DataFrame().addSeries(COL_VALUE, 1000));
 
-    this.anomalies = Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6));
-
     this.testDataProvider = new MockDataProvider().setAggregates(aggregates);
   }
 
@@ -72,11 +69,12 @@ public void testSiteWideImpactFilter() {
     SitewideImpactRuleAnomalyFilterSpec spec = new SitewideImpactRuleAnomalyFilterSpec();
     spec.setThreshold(0.5);
     spec.setOffset("median3w");
+    spec.setPattern("down");
     SitewideImpactRuleAnomalyFilter filter = new SitewideImpactRuleAnomalyFilter();
     filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, 125L));
 
     List<Boolean> results =
-        this.anomalies.stream().map(anomaly -> filter.isQualified(anomaly)).collect(Collectors.toList());
+        Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6)).stream().map(anomaly -> filter.isQualified(anomaly)).collect(Collectors.toList());
     Assert.assertEquals(results, Arrays.asList(false, true));
   }
 
@@ -86,4 +84,22 @@ private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
     anomaly.setMetricUrn(METRIC_URN);
     return anomaly;
   }
+
+  @Test
+  public void testSiteWideImpactFilterNoOffset() {
+    SitewideImpactRuleAnomalyFilterSpec spec = new SitewideImpactRuleAnomalyFilterSpec();
+    spec.setThreshold(0.5);
+    spec.setPattern("down");
+    SitewideImpactRuleAnomalyFilter filter = new SitewideImpactRuleAnomalyFilter();
+    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, 125L));
+    List<MergedAnomalyResultDTO> anomalyResultDTOs = Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6));
+    anomalyResultDTOs.get(0).setAvgCurrentVal(150);
+    anomalyResultDTOs.get(0).setAvgBaselineVal(200);
+    anomalyResultDTOs.get(1).setAvgCurrentVal(500);
+    anomalyResultDTOs.get(1).setAvgBaselineVal(1000);
+
+    List<Boolean> results =
+        anomalyResultDTOs.stream().map(anomaly -> filter.isQualified(anomaly)).collect(Collectors.toList());
+    Assert.assertEquals(results, Arrays.asList(false, true));
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pinot.apache.org
For additional commands, e-mail: dev-help@pinot.apache.org