You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2018/12/06 18:18:34 UTC

[incubator-pinot] branch master updated: [TE] detection pipeline - multiple improvements (#3586)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dcdf5d4  [TE] detection pipeline - multiple improvements (#3586)
dcdf5d4 is described below

commit dcdf5d4f92e2a910c8c118151c4a28af16a0a40a
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Thu Dec 6 10:18:28 2018 -0800

    [TE] detection pipeline - multiple improvements (#3586)
    
    - Change the pipeline baseline and predicted values filling so that it's consistent with RCA
    - Replay moving window
    - Replay endpoint for the new pipeline with legacy behaviors
    - Add window size, window unit, WoW rule, min-max threshold migration to YAML migration endpoint
    - Baseline filling merger handles anomalies per detector per dimension
    - Baseline filling merger injects detector to the next level detector wrapper
    - Other fixes
    - Tests
---
 .../anomaly/detection/DetectionTaskRunner.java     |   2 +-
 .../bao/jdbc/MergedAnomalyResultManagerImpl.java   |   3 +
 .../detection/DetectionMigrationResource.java      |  56 +++++++-
 .../thirdeye/detection/DetectionResource.java      | 159 ++++++++++++++++++++-
 .../thirdeye/detection/DetectionUtils.java         |   4 +-
 .../thirdeye/detection/algorithm/MergeWrapper.java |  15 +-
 .../detection/wrapper/AnomalyDetectorWrapper.java  |  59 ++++++--
 .../wrapper/BaselineFillingMergeWrapper.java       |  72 ++++++++--
 .../wrapper/ChildKeepingMergeWrapper.java          |   4 +-
 .../yaml/CompositePipelineConfigTranslator.java    |   6 +-
 .../wrapper/AnomalyDetectorWrapperTest.java        |  16 ++-
 .../wrapper/BaselineFillingMergeWrapperTest.java   |  10 +-
 .../compositePipelineTranslatorTestResult-1.json   |   8 +-
 .../compositePipelineTranslatorTestResult-2.json   |   6 +-
 14 files changed, 368 insertions(+), 52 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
index 57d7fb5..30be06f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
@@ -259,7 +259,7 @@ public class DetectionTaskRunner implements TaskRunner {
       historyMergedAnomalies = Collections.emptyList();
     }
 
-    LOG.info("Analyzing anomaly function with explored dimensions: {}, windowStart: {}, windowEnd: {}",
+    LOG.info("[Old pipeline] Analyzing anomaly function with explored dimensions: {}, windowStart: {}, windowEnd: {}",
         dimensionMap, windowStart, windowEnd);
 
     AnomalyUtils.logAnomaliesOverlapWithWindow(windowStart, windowEnd, historyMergedAnomalies);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
index 42cbc62..aad718a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
@@ -405,6 +405,9 @@ public class MergedAnomalyResultManagerImpl extends AbstractManagerImpl<MergedAn
       for (MergedAnomalyResultDTO child : entity.getChildren()) {
         if (child.getId() == null) {
           // only allow single level to prevent cycles
+          if (child == entity){
+            throw new IllegalArgumentException("Cannot contain itself as child anomaly");
+          }
           if (child.getChildren() != null && !child.getChildren().isEmpty()) {
             throw new IllegalArgumentException("Multi-level anomaly nesting not supported");
           }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index d4035aa..c53af49 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -28,6 +28,7 @@ import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
 import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -59,6 +60,8 @@ public class DetectionMigrationResource {
   private static final Logger LOGGER = LoggerFactory.getLogger(DetectionMigrationResource.class);
   private static final String PROP_WINDOW_DELAY = "windowDelay";
   private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
+  private static final String PROP_WINDOW_SIZE = "windowSize";
+  private static final String PROP_WINDOW_UNIT = "windowUnit";
 
   private final LegacyAnomalyFunctionTranslator translator;
   private final AnomalyFunctionManager anomalyFunctionDAO;
@@ -115,8 +118,21 @@ public class DetectionMigrationResource {
     ruleYaml.put("name", "myRule");
 
     // detection
-    ruleYaml.put("detection", Collections.singletonList(
-        ImmutableMap.of("type", "ALGORITHM", "params", getAlgorithmDetectorParams(anomalyFunctionDTO))));
+    if (anomalyFunctionDTO.getType().equals("WEEK_OVER_WEEK_RULE")){
+      // wo1w change detector
+      ruleYaml.put("detection", Collections.singletonList(ImmutableMap.of("type", "PERCENTAGE_RULE",
+          "params", getPercentageChangeRuleDetectorParams(anomalyFunctionDTO))));
+    } else if (anomalyFunctionDTO.getType().equals("MIN_MAX_THRESHOLD")){
+      // threshold detector
+      ruleYaml.put("detection", Collections.singletonList(ImmutableMap.of("type", "THRESHOLD",
+          "params", getMinMaxThresholdRuleDetectorParams(anomalyFunctionDTO))));
+    } else{
+      // algorithm detector
+      ruleYaml.put("detection", Collections.singletonList(
+          ImmutableMap.of("type", "ALGORITHM", "params", getAlgorithmDetectorParams(anomalyFunctionDTO),
+              PROP_WINDOW_SIZE, anomalyFunctionDTO.getWindowSize(),
+              PROP_WINDOW_UNIT, anomalyFunctionDTO.getWindowUnit().toString())));
+    }
 
     // filters
     Map<String, String> alertFilter = anomalyFunctionDTO.getAlertFilter();
@@ -157,7 +173,7 @@ public class DetectionMigrationResource {
     return this.yaml.dump(yamlConfigs);
   }
 
-  private Map<String, Object> getDimensionExplorationParams(AnomalyFunctionDTO functionDTO) {
+  private Map<String, Object> getDimensionExplorationParams(AnomalyFunctionDTO functionDTO) throws IOException {
     Map<String, Object> dimensionExploreYaml = new LinkedHashMap<>();
     dimensionExploreYaml.put("dimensions", Collections.singletonList(functionDTO.getExploreDimensions()));
     if (functionDTO.getDataFilter() != null && !functionDTO.getDataFilter().isEmpty() && functionDTO.getDataFilter().get("type").equals("average_threshold")) {
@@ -166,6 +182,13 @@ public class DetectionMigrationResource {
       dimensionExploreYaml.put("minValue", Double.valueOf(functionDTO.getDataFilter().get("threshold")));
       dimensionExploreYaml.put("minLiveZone", functionDTO.getDataFilter().get("minLiveZone"));
     }
+    if (functionDTO.getType().equals("MIN_MAX_THRESHOLD")){
+      // migrate volume threshold
+      Properties properties = AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+      if (properties.containsKey("averageVolumeThreshold")){
+        dimensionExploreYaml.put("minValue", properties.getProperty("averageVolumeThreshold"));
+      }
+    }
     return dimensionExploreYaml;
   }
 
@@ -208,6 +231,33 @@ public class DetectionMigrationResource {
     return new Period(TimeUnit.MILLISECONDS.convert(functionDTO.getBucketSize(), functionDTO.getBucketUnit())).toString();
   }
 
+  private Map<String, Object> getPercentageChangeRuleDetectorParams(AnomalyFunctionDTO functionDTO) throws IOException {
+    Map<String, Object> detectorYaml = new LinkedHashMap<>();
+    Properties properties = AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+    double threshold = Double.valueOf(properties.getProperty("changeThreshold"));
+    if (properties.containsKey("changeThreshold")){
+      detectorYaml.put("percentageChange", Math.abs(threshold));
+      if (threshold > 0){
+        detectorYaml.put("pattern", "UP");
+      } else {
+        detectorYaml.put("pattern", "DOWN");
+      }
+    }
+    return detectorYaml;
+  }
+
+  private Map<String, Object> getMinMaxThresholdRuleDetectorParams(AnomalyFunctionDTO functionDTO) throws IOException {
+    Map<String, Object> detectorYaml = new LinkedHashMap<>();
+    Properties properties = AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+    if (properties.containsKey("min")){
+      detectorYaml.put("min", properties.getProperty("min"));
+    }
+    if (properties.containsKey("max")){
+      detectorYaml.put("max", properties.getProperty("max"));
+    }
+    return detectorYaml;
+  }
+
   private Map<String, Object> getAlgorithmDetectorParams(AnomalyFunctionDTO functionDTO) throws Exception {
     Map<String, Object> detectorYaml = new LinkedHashMap<>();
     Map<String, Object> params = new LinkedHashMap<>();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index a51a3a6..dd63d06 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -17,6 +17,7 @@
 package com.linkedin.thirdeye.detection;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
 import com.linkedin.thirdeye.constant.AnomalyResultSource;
 import com.linkedin.thirdeye.datalayer.bao.DatasetConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
@@ -35,9 +36,11 @@ import com.linkedin.thirdeye.detection.finetune.GridSearchTuningAlgorithm;
 import com.linkedin.thirdeye.detection.finetune.TuningAlgorithm;
 import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.wordnik.swagger.annotations.ApiParam;
+import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +54,9 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.collections.MapUtils;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
+import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +66,9 @@ import org.slf4j.LoggerFactory;
 public class DetectionResource {
   private static final Logger LOG = LoggerFactory.getLogger(DetectionResource.class);
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final String DAILY_CRON = "0 0 14 * * ? *";
+  private static final String HOURLY_CRON = "0 0 * * * ? *";
+  private static final String MINUTE_CRON = "0 0/15 * * * ? *";
 
   private final MetricConfigManager metricDAO;
   private final DatasetConfigManager datasetDAO;
@@ -164,11 +172,102 @@ public class DetectionResource {
     return Response.ok(result).build();
   }
 
+  // return default bucket size based on cron schedule.
+  private long getBucketSize(DetectionConfigDTO config){
+    switch (config.getCron()) {
+      case DAILY_CRON:
+        // daily
+        return TimeUnit.DAYS.toMillis(1);
+      case MINUTE_CRON:
+        // minute level
+        return TimeUnit.MINUTES.toMillis(15);
+      case HOURLY_CRON:
+        // hourly
+        return TimeUnit.HOURS.toMillis(1);
+    }
+    throw new IllegalArgumentException("bucket size not determined");
+  }
+
+  // return default window size based on cron schedule.
+  private long getWindowSize(DetectionConfigDTO config) {
+    switch (config.getCron()) {
+      case DAILY_CRON:
+        // daily
+        return TimeUnit.DAYS.toMillis(1);
+      case MINUTE_CRON:
+        // minute level
+        return TimeUnit.HOURS.toMillis(6);
+      case HOURLY_CRON:
+        // hourly
+        return TimeUnit.HOURS.toMillis(24);
+    }
+    throw new IllegalArgumentException("window size not determined");
+  }
+
+  /*
+  Generates monitoring window based on cron schedule.
+   */
+  private List<Interval> getReplayMonitoringWindows(DetectionConfigDTO config, long start, long end, Long windowSize, Long bucketSize) throws ParseException {
+    List<Interval> monitoringWindows = new ArrayList<>();
+    CronExpression cronExpression = new CronExpression(config.getCron());
+    DateTime currentStart = new DateTime(start);
+
+    long legacyWindowSize;
+    if(windowSize == null){
+      legacyWindowSize = getWindowSize(config);
+      LOG.warn("[Legacy replay] window size not set when replay {}. Use default window size {}", config.getId(), legacyWindowSize);
+    } else {
+      legacyWindowSize = windowSize;
+    }
+
+    long legacyBucketSize;
+    if (bucketSize == null){
+      legacyBucketSize = getBucketSize(config);
+      LOG.warn("[Legacy replay] bucket size not set when replay {}. Use default bucket size {}", config.getId(), legacyBucketSize);
+    } else {
+      legacyBucketSize = bucketSize;
+    }
+
+    // add offsets to that it would replay all the moving windows within start time and end time
+    currentStart = currentStart.plus(legacyWindowSize).minus(legacyBucketSize);
+    if (config.getCron().equals(DAILY_CRON)){
+      // daily detection offset by 1 to pick up the first moving window
+      currentStart.minus(1L);
+    }
+
+    DateTime currentEnd = new DateTime(cronExpression.getNextValidTimeAfter(currentStart.toDate()));
+    DateTime endBoundary = new DateTime(cronExpression.getNextValidTimeAfter(new Date(end)));
+    while (currentEnd.isBefore(endBoundary)) {
+      monitoringWindows.add(new Interval(currentStart.getMillis(), currentEnd.getMillis()));
+      currentStart = currentEnd;
+      currentEnd =  new DateTime(cronExpression.getNextValidTimeAfter(currentStart.toDate()));
+    }
+    return monitoringWindows;
+  }
+
+  /**
+   * Legacy replay endpoint. Replay all the moving windows within start time and end time.
+   * Saves anomaly for each moving window before starting detection for next window.
+   * Behaves exactly like the legacy replay endpoint.
+   * See also {@link com.linkedin.thirdeye.dashboard.resources.DetectionJobResource#generateAnomaliesInRangeForFunctions(String, String, String, String, Boolean, Boolean)}}
+   * @param configId the config id to replay
+   * @param start start time
+   * @param end end time
+   * @param deleteExistingAnomaly (optional) delete existing anomaly or not
+   * @param windowSize (optional) override the default window size
+   * @param bucketSize (optional) override the default window size
+   * @return anomalies
+   * @throws Exception
+   */
   @POST
-  @Path("/replay/{id}")
-  public Response detectionReplay(@PathParam("id") long configId, @QueryParam("start") long start,
+  @Path("/legacy-replay/{id}")
+  public Response legacyReplay(
+      @PathParam("id") long configId,
+      @QueryParam("start") long start,
       @QueryParam("end") long end,
-      @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly) throws Exception {
+      @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly,
+      @QueryParam("windowSize") Long windowSize,
+      @QueryParam("bucketSize") Long bucketSize) throws Exception {
 
     DetectionConfigDTO config = this.configDAO.findById(configId);
     if (config == null) {
@@ -189,6 +288,57 @@ public class DetectionResource {
     }
 
     // execute replay
+    List<Interval> monitoringWindows = getReplayMonitoringWindows(config, start, end, windowSize, bucketSize);
+    for (Interval monitoringWindow : monitoringWindows){
+      DetectionPipeline pipeline = this.loader.from(this.provider, config, monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
+      DetectionPipelineResult result = pipeline.run();
+
+      // save state
+      if (result.getLastTimestamp() > 0) {
+        config.setLastTimestamp(result.getLastTimestamp());
+      }
+
+      this.configDAO.update(config);
+
+      for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+        anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
+        this.anomalyDAO.save(anomaly);
+      }
+    }
+
+    Collection<MergedAnomalyResultDTO> replayResult = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+    LOG.info("replay detection pipeline {} generated {} anomalies.", config.getId(), replayResult.size());
+    return Response.ok(replayResult).build();
+  }
+
+  /**
+   * Replay for a given time range. Without cron schedule behavior
+   */
+  @POST
+  @Path("/replay/{id}")
+  public Response detectionReplay(
+      @PathParam("id") long configId,
+      @QueryParam("start") long start,
+      @QueryParam("end") long end) throws Exception {
+
+    DetectionConfigDTO config = this.configDAO.findById(configId);
+    if (config == null) {
+      throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
+    }
+
+    // clear existing anomalies
+    AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+    Collection<MergedAnomalyResultDTO> existing = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+    List<Long> existingIds = new ArrayList<>();
+    for (MergedAnomalyResultDTO anomaly : existing) {
+      existingIds.add(anomaly.getId());
+    }
+
+    this.anomalyDAO.deleteByIds(existingIds);
+
+    // execute replay
     DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
     DetectionPipelineResult result = pipeline.run();
 
@@ -204,7 +354,6 @@ public class DetectionResource {
       this.anomalyDAO.save(anomaly);
     }
 
-    LOG.info("replay detection pipeline {} generated {} anomalies.", config.getId(), result.getAnomalies().size());
-    return Response.ok(result.getAnomalies()).build();
+    return Response.ok(result).build();
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
index f1bf20d..5f80b10 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
@@ -65,8 +65,10 @@ public class DetectionUtils {
   }
 
   // get the component name from the reference key
+  // example "$myRule:ALGORITHM:0" -> "myRule:ALGORITHM:0"
   public static String getComponentName(String key) {
-    return key.substring(1);
+    if (isReferenceName(key)) return key.substring(1);
+    else throw new IllegalArgumentException("not a component reference key. should starts with $");
   }
 
   // get the spec class name for a component class
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
index 43f9c27..19b13fb 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
@@ -46,7 +46,7 @@ public class MergeWrapper extends DetectionPipeline {
   private static final String PROP_NESTED = "nested";
   private static final String PROP_CLASS_NAME = "className";
   private static final String PROP_MERGE_KEY = "mergeKey";
-  private static final String PROP_DETECTOR_COMPONENT_KEY = "detectorComponentKey";
+  private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
 
   protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new Comparator<MergedAnomalyResultDTO>() {
     @Override
@@ -67,7 +67,7 @@ public class MergeWrapper extends DetectionPipeline {
   protected final List<Map<String, Object>> nestedProperties;
   protected final long maxGap; // max time gap for merge
   protected final long maxDuration; // max overall duration of merged anomaly
-  private final AnomalySlice slice;
+  protected final AnomalySlice slice;
 
   /**
    * Instantiates a new merge wrapper.
@@ -181,7 +181,7 @@ public class MergeWrapper extends DetectionPipeline {
     return output;
   }
 
-  private long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) {
+  protected long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) {
     long time = this.startTime;
     for (MergedAnomalyResultDTO anomaly : anomalies) {
       time = Math.min(anomaly.getStartTime(), time);
@@ -189,7 +189,7 @@ public class MergeWrapper extends DetectionPipeline {
     return time;
   }
 
-  private long getEndTime(Iterable<MergedAnomalyResultDTO> anomalies) {
+  protected long getEndTime(Iterable<MergedAnomalyResultDTO> anomalies) {
     long time = this.endTime;
     for (MergedAnomalyResultDTO anomaly : anomalies) {
       time = Math.max(anomaly.getEndTime(), time);
@@ -214,7 +214,7 @@ public class MergeWrapper extends DetectionPipeline {
 
     public static AnomalyKey from(MergedAnomalyResultDTO anomaly) {
       return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY),
-          anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_KEY));
+          anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_NAME));
     }
 
     @Override
@@ -227,12 +227,13 @@ public class MergeWrapper extends DetectionPipeline {
       }
       AnomalyKey that = (AnomalyKey) o;
       return Objects.equals(metric, that.metric) && Objects.equals(collection, that.collection) && Objects.equals(
-          dimensions, that.dimensions) && Objects.equals(mergeKey, that.mergeKey);
+          dimensions, that.dimensions) && Objects.equals(mergeKey, that.mergeKey) && Objects.equals(componentKey,
+          that.componentKey);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(metric, collection, dimensions, mergeKey);
+      return Objects.hash(metric, collection, dimensions, mergeKey, componentKey);
     }
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index a650550..0374892 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -18,8 +18,10 @@ package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.base.Preconditions;
 import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
+import com.linkedin.thirdeye.anomalydetection.context.AnomalyResult;
 import com.linkedin.thirdeye.api.TimeGranularity;
 import com.linkedin.thirdeye.api.TimeSpec;
+import com.linkedin.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
@@ -61,7 +63,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private static final String PROP_WINDOW_UNIT = "windowUnit";
   private static final String PROP_FREQUENCY = "frequency";
   private static final String PROP_DETECTOR = "detector";
-  private static final String PROP_DETECTOR_COMPONENT_KEY = "detectorComponentKey";
+  private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
+  private static final String PROP_TIMEZONE = "timezone";
 
   private static final Logger LOG = LoggerFactory.getLogger(
       AnomalyDetectorWrapper.class);
@@ -76,11 +79,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private final MetricConfigDTO metric;
   private final MetricEntity metricEntity;
   private final boolean isMovingWindowDetection;
-  private DatasetConfigDTO dataset;
-  private DateTimeZone dateTimeZone;
   // need to specify run frequency for minute level detection. Used for moving monitoring window alignment, default to be 15 minutes.
   private final TimeGranularity functionFrequency;
-  private final String detectorReferenceKey;
+  private final String detectorName;
+  private final long windowSizeMillis;
+  private final DatasetConfigDTO dataset;
+  private final DateTimeZone dateTimeZone;
+
 
   public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
     super(provider, config, startTime, endTime);
@@ -90,19 +95,30 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     this.metric = provider.fetchMetrics(Collections.singleton(this.metricEntity.getId())).get(this.metricEntity.getId());
 
     Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_DETECTOR));
-    this.detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_DETECTOR));
-    Preconditions.checkArgument(this.config.getComponents().containsKey(this.detectorReferenceKey));
-    this.anomalyDetector = (AnomalyDetector) this.config.getComponents().get(this.detectorReferenceKey);
+    this.detectorName = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_DETECTOR));
+    Preconditions.checkArgument(this.config.getComponents().containsKey(this.detectorName));
+    this.anomalyDetector = (AnomalyDetector) this.config.getComponents().get(this.detectorName);
 
+    // emulate moving window or now
     this.isMovingWindowDetection = MapUtils.getBooleanValue(config.getProperties(), PROP_MOVING_WINDOW_DETECTION, false);
     // delays to wait for data becomes available
     this.windowDelay = MapUtils.getIntValue(config.getProperties(), PROP_WINDOW_DELAY, 0);
+    // window delay unit
     this.windowDelayUnit = TimeUnit.valueOf(MapUtils.getString(config.getProperties(), PROP_WINDOW_DELAY_UNIT, "DAYS"));
     // detection window size
     this.windowSize = MapUtils.getIntValue(config.getProperties(), PROP_WINDOW_SIZE, 1);
     this.windowUnit = TimeUnit.valueOf(MapUtils.getString(config.getProperties(), PROP_WINDOW_UNIT, "DAYS"));
+    this.windowSizeMillis = TimeUnit.MILLISECONDS.convert(windowSize, windowUnit);
+    // run frequency, used to determine moving windows for minute-level detection
     Map<String, Object> frequency = MapUtils.getMap(config.getProperties(), PROP_FREQUENCY, Collections.emptyMap());
     this.functionFrequency = new TimeGranularity(MapUtils.getIntValue(frequency, "size", 15), TimeUnit.valueOf(MapUtils.getString(frequency, "unit", "MINUTES")));
+
+    MetricEntity me = MetricEntity.fromURN(this.metricUrn);
+    MetricConfigDTO metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
+    this.dataset = this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
+        .get(metricConfigDTO.getDataset());
+    // date time zone for moving windows. use dataset time zone as default
+    this.dateTimeZone = DateTimeZone.forID(MapUtils.getString(config.getProperties(), PROP_TIMEZONE, "America/Los_Angeles"));
   }
 
   @Override
@@ -112,6 +128,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     for (Interval window : monitoringWindows) {
       List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
       try {
+        LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time{}", config.getId(), metricUrn, window.getStart(), window.getEnd());
         anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
       } catch (Exception e) {
         LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
@@ -125,27 +142,25 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
       anomaly.setMetric(this.metric.getName());
       anomaly.setCollection(this.metric.getDataset());
       anomaly.setDimensions(DetectionUtils.toFilterMap(this.metricEntity.getFilters()));
-      anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_KEY, this.detectorReferenceKey);
+      anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME, this.detectorName);
     }
     return new DetectionPipelineResult(anomalies);
   }
 
+  // get a list of the monitoring window, if no sliding window used, use start time and end time as window
   List<Interval> getMonitoringWindows() {
     if (this.isMovingWindowDetection) {
       try{
         List<Interval> monitoringWindows = new ArrayList<>();
-        MetricEntity me = MetricEntity.fromURN(this.metricUrn);
-        MetricConfigDTO metricConfigDTO =
-            this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
-        dataset = this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
-            .get(metricConfigDTO.getDataset());
-        dateTimeZone = DateTimeZone.forID(dataset.getTimezone());
         List<Long> monitoringWindowEndTimes = getMonitoringWindowEndTimes();
         for (long monitoringEndTime : monitoringWindowEndTimes) {
           long endTime = monitoringEndTime - TimeUnit.MILLISECONDS.convert(windowDelay, windowDelayUnit);
-          long startTime = endTime - TimeUnit.MILLISECONDS.convert(windowSize, windowUnit);
+          long startTime = endTime - this.windowSizeMillis;
           monitoringWindows.add(new Interval(startTime, endTime, dateTimeZone));
         }
+        for (Interval window : monitoringWindows){
+          LOG.info("running detections in windows {}", window);
+        }
         return monitoringWindows;
       } catch (Exception e) {
         LOG.info("can't generate moving monitoring windows, calling with single detection window", e);
@@ -154,6 +169,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     return Collections.singletonList(new Interval(startTime, endTime));
   }
 
+  // get the list of monitoring window end times
   private List<Long> getMonitoringWindowEndTimes() {
     List<Long> endTimes = new ArrayList<>();
 
@@ -169,6 +185,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     return endTimes;
   }
 
+  /**
+   * round this time to earlier boundary, depending on granularity of dataset
+   * e.g. 12:15pm on HOURLY dataset should be treated as 12pm
+   * any dataset with granularity finer than HOUR, will be rounded as per function frequency (assumption is that this is in MINUTES)
+   * so 12.53 on 5 MINUTES dataset, with function frequency 15 MINUTES will be rounded to 12.45
+   * See also {@link DetectionJobSchedulerUtils#getBoundaryAlignedTimeForDataset(DateTime, TimeUnit)}
+   */
   private long getBoundaryAlignedTimeForDataset(DateTime currentTime) {
     TimeSpec timeSpec = ThirdEyeUtils.getTimeSpecFromDatasetConfig(dataset);
     TimeUnit dataUnit = timeSpec.getDataGranularity().getUnit();
@@ -192,6 +215,12 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     return currentTime.getMillis();
   }
 
+  /**
+   * get bucket size in millis, according to data granularity of dataset
+   * Bucket size are 1 HOUR for hourly, 1 DAY for daily
+   * For MINUTE level data, bucket size is calculated based on anomaly function frequency
+   * See also {@link DetectionJobSchedulerUtils#getBucketSizePeriodForDataset(DatasetConfigDTO, AnomalyFunctionDTO)} (DateTime, TimeUnit)}
+   */
   public Period getBucketSizePeriodForDataset() {
     Period bucketSizePeriod = null;
     TimeSpec timeSpec = ThirdEyeUtils.getTimeSpecFromDatasetConfig(dataset);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index 64ffc47..df08645 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -17,10 +17,13 @@
 package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.linkedin.thirdeye.dataframe.DoubleSeries;
 import com.linkedin.thirdeye.dataframe.Series;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionUtils;
 import com.linkedin.thirdeye.detection.DefaultInputDataFetcher;
@@ -29,11 +32,15 @@ import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
 import com.linkedin.thirdeye.detection.components.RuleBaselineProvider;
 import com.linkedin.thirdeye.detection.spec.RuleBaselineProviderSpec;
 import com.linkedin.thirdeye.detection.spi.components.BaselineProvider;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregateType;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +48,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Baseline filling merger. This merger's merging behavior is the same as MergeWrapper. But add the capability
- * of filling baseline & current values.
+ * of filling baseline & current values and inject detector, metric urn. Each detector has a separate baseline filling merge wrapper
  */
 public class BaselineFillingMergeWrapper extends MergeWrapper {
   private static final Logger LOG = LoggerFactory.getLogger(MergeWrapper.class);
@@ -49,12 +56,15 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
   private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
   private static final String PROP_CURRENT_PROVIDER = "currentValueProvider";
   private static final String PROP_METRIC_URN = "metricUrn";
-  private static final String PROP_BASELINE_PROVIDER_COMPONENT_KEY = "baselineProviderComponentKey";
+  private static final String PROP_BASELINE_PROVIDER_COMPONENT_NAME = "baselineProviderComponentName";
+  private static final String PROP_DETECTOR = "detector";
+  private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
 
   private BaselineProvider baselineValueProvider; // optionally configure a baseline value loader
   private BaselineProvider currentValueProvider;
-  private Series.DoubleFunction aggregationFunction;
   private String baselineProviderComponentKey;
+  private String detectorComponentName;
+  private String metricUrn;
 
   public BaselineFillingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
   {
@@ -77,14 +87,24 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
       InputDataFetcher dataFetcher = new DefaultInputDataFetcher(this.provider, this.config.getId());
       this.currentValueProvider.init(spec, dataFetcher);
     }
+
+    // inject detector to nested property if possible
+    String detectorComponentKey = MapUtils.getString(config.getProperties(), PROP_DETECTOR);
+    if (detectorComponentKey != null){
+      this.detectorComponentName = DetectionUtils.getComponentName(detectorComponentKey);
+      for (Map<String, Object> properties : this.nestedProperties){
+        properties.put(PROP_DETECTOR, detectorComponentKey);
+      }
+    }
+
+    // inject metricUrn to nested property if possible
     String nestedUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
     if (nestedUrn != null){
+      this.metricUrn = nestedUrn;
       for (Map<String, Object> properties : this.nestedProperties){
         properties.put(PROP_METRIC_URN, nestedUrn);
       }
     }
-
-    this.aggregationFunction = BaselineAggregateType.valueOf(MapUtils.getString(config.getProperties(), "metricFunction", "MEAN")).getFunction();
   }
 
   @Override
@@ -92,6 +112,26 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
     return this.fillCurrentAndBaselineValue(super.merge(anomalies));
   }
 
+  @Override
+  protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+    AnomalySlice effectiveSlice = this.slice
+        .withStart(this.getStartTime(generated) - this.maxGap - 1)
+        .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+    Collection<MergedAnomalyResultDTO> retrieved = this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), this.config.getId()).get(effectiveSlice);
+
+    Collection<MergedAnomalyResultDTO> anomalies =
+        Collections2.filter(retrieved,
+                mergedAnomaly -> mergedAnomaly != null &&
+                !mergedAnomaly.isChild() &&
+                    // merge if only the anomaly generated by the same detector
+                this.detectorComponentName.equals(mergedAnomaly.getProperties().getOrDefault(PROP_DETECTOR_COMPONENT_NAME, "")) &&
+                    // merge if only the anomaly is in the same dimension
+                this.metricUrn.equals(mergedAnomaly.getMetricUrn())
+        );
+    return new ArrayList<>(anomalies);
+  }
+
   /**
    * Fill in current and baseline value for the anomalies
    * @param mergedAnomalies anomalies
@@ -101,12 +141,25 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
     for (MergedAnomalyResultDTO anomaly : mergedAnomalies) {
       try {
         String metricUrn = anomaly.getMetricUrn();
-        final MetricSlice slice = MetricSlice.from(MetricEntity.fromURN(metricUrn).getId(), anomaly.getStartTime(), anomaly.getEndTime(),
-            MetricEntity.fromURN(metricUrn).getFilters());
+        MetricEntity me = MetricEntity.fromURN(metricUrn);
+        long metricId = me.getId();
+        MetricConfigDTO metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(metricId)).get(metricId);
+        // aggregation function
+        Series.DoubleFunction aggregationFunction = DoubleSeries.MEAN;
+
+        try {
+          aggregationFunction =
+              BaselineAggregateType.valueOf(metricConfigDTO.getDefaultAggFunction().name()).getFunction();
+        } catch (Exception e) {
+          LOG.warn("cannot get aggregation function for metric, using average", metricId);
+        }
+
+        final MetricSlice slice = MetricSlice.from(metricId, anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
         anomaly.setAvgCurrentVal(this.currentValueProvider.computePredictedAggregates(slice, aggregationFunction));
         if (this.baselineValueProvider != null) {
           anomaly.setAvgBaselineVal(this.baselineValueProvider.computePredictedAggregates(slice, aggregationFunction));
-          anomaly.getProperties().put(PROP_BASELINE_PROVIDER_COMPONENT_KEY, this.baselineProviderComponentKey);
+          anomaly.getProperties().put(PROP_BASELINE_PROVIDER_COMPONENT_NAME, this.baselineProviderComponentKey);
+          anomaly.setWeight(calculateWeight(anomaly));
         }
       } catch (Exception e) {
         // ignore
@@ -116,4 +169,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
     return mergedAnomalies;
   }
 
+  private double calculateWeight(MergedAnomalyResultDTO anomaly) {
+    return (anomaly.getAvgCurrentVal() - anomaly.getAvgBaselineVal()) / anomaly.getAvgBaselineVal();
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 1382d9e..9ffc5c2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -20,7 +20,6 @@ import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
-import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,7 +36,7 @@ import java.util.Map;
  */
 public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
   public ChildKeepingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
-      throws Exception {
+  {
     super(provider, config, startTime, endTime);
   }
 
@@ -105,6 +104,7 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
     newAnomaly.setFeedback(anomaly.getFeedback());
     newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId());
     newAnomaly.setScore(anomaly.getScore());
+    newAnomaly.setWeight(anomaly.getWeight());
     return newAnomaly;
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 45ecf73..45cad5b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -138,6 +138,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
   private static final String PROP_WINDOW_UNIT = "windowUnit";
   private static final String PROP_FREQUENCY = "frequency";
   private static final String PROP_MERGER = "merger";
+  private static final String PROP_TIMEZONE = "timezone";
 
   private static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
   private static final Map<String, String> DETECTOR_TO_BASELINE = ImmutableMap.of("ALGORITHM", "ALGORITHM_BASELINE");
@@ -227,7 +228,6 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     Map<String, Object> nestedProperties = new HashMap<>();
     nestedProperties.put(PROP_CLASS_NAME, AnomalyDetectorWrapper.class.getName());
     String detectorKey = makeComponentKey(ruleName, detectorType, id);
-    nestedProperties.put(PROP_DETECTOR, detectorKey);
 
     fillInWindowSizeAndUnit(nestedProperties, yamlConfig, detectorType);
 
@@ -242,6 +242,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
     }
     String baselineProviderKey = makeComponentKey(ruleName, baselineProviderType, id);
     properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
+    properties.put(PROP_DETECTOR, detectorKey);
     buildComponentSpec(yamlConfig, baselineProviderType, baselineProviderKey);
     properties.putAll(this.mergerProperties);
     return properties;
@@ -283,6 +284,9 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
       if (yamlConfig.containsKey(PROP_WINDOW_DELAY_UNIT)) {
         properties.put(PROP_WINDOW_DELAY_UNIT, MapUtils.getString(yamlConfig, PROP_WINDOW_DELAY_UNIT));
       }
+      if (yamlConfig.containsKey(PROP_TIMEZONE)){
+        properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig, PROP_TIMEZONE));
+      }
     }
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
index ce1c03c..0d6891e 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
@@ -40,11 +40,11 @@ public class AnomalyDetectorWrapperTest {
   private static final String PROP_METRIC_URN = "metricUrn";
   private static final String PROP_MOVING_WINDOW_DETECTION = "isMovingWindowDetection";
   private static final String PROP_DETECTOR = "detector";
+  private static final String PROP_TIMEZONE = "timezone";
 
   private MockDataProvider provider;
   private Map<String, Object> properties;
   private DetectionConfigDTO config;
-  private Map<String, Object> stageSpecs;
 
   @BeforeMethod
   public void setUp() {
@@ -80,6 +80,7 @@ public class AnomalyDetectorWrapperTest {
   @Test
   public void testMovingMonitoringWindow() {
     this.properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+    this.properties.put(PROP_TIMEZONE, TimeSpec.DEFAULT_TIMEZONE);
     AnomalyDetectorWrapper detectionPipeline =
         new AnomalyDetectorWrapper(this.provider, this.config, 1540147725000L, 1540493325000L);
     List<Interval> monitoringWindows = detectionPipeline.getMonitoringWindows();
@@ -89,4 +90,17 @@ public class AnomalyDetectorWrapperTest {
             new Interval(1540252800000L, 1540339200000L, timeZone), new Interval(1540339200000L, 1540425600000L, timeZone)));
   }
 
+  @Test
+  public void testMovingMonitoringWindowBoundary() {
+    this.properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+    this.properties.put(PROP_TIMEZONE, TimeSpec.DEFAULT_TIMEZONE);
+    AnomalyDetectorWrapper detectionPipeline =
+        new AnomalyDetectorWrapper(this.provider, this.config, 1540080000000L, 1540425600000L);
+    List<Interval> monitoringWindows = detectionPipeline.getMonitoringWindows();
+    DateTimeZone timeZone = DateTimeZone.forID(TimeSpec.DEFAULT_TIMEZONE);
+    Assert.assertEquals(monitoringWindows,
+        Arrays.asList(new Interval(1540080000000L, 1540166400000L, timeZone), new Interval(1540166400000L, 1540252800000L, timeZone),
+            new Interval(1540252800000L, 1540339200000L, timeZone), new Interval(1540339200000L, 1540425600000L, timeZone)));
+  }
+
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 998c8eb..481c6f0 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -17,10 +17,12 @@
 package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.constant.MetricAggFunction;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DefaultInputDataFetcher;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
@@ -95,16 +97,22 @@ public class BaselineFillingMergeWrapperTest {
   @Test
   public void testMergerCurrentAndBaselineLoading() throws Exception {
     MergedAnomalyResultDTO anomaly = makeAnomaly(3000, 3600);
+    anomaly.setProperties(ImmutableMap.of("detectorComponentName", "testDetector"));
     anomaly.setMetricUrn("thirdeye:metric:1");
 
     Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
     aggregates.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(-1, 100).build());
 
+    MetricConfigDTO metric = new MetricConfigDTO();
+    metric.setId(1L);
+    metric.setDefaultAggFunction(MetricAggFunction.SUM);
     DataProvider
-        provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates);
+        provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates)
+        .setMetrics(Collections.singletonList(metric));
 
     this.config.getProperties().put(PROP_MAX_GAP, 100);
     this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
+    this.config.getProperties().put("detector", "$testDetector");
     BaselineProvider baselineProvider = new MockBaselineProvider();
     MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
     spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
index 050043c..65fb1b5 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
@@ -17,9 +17,9 @@
             "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
             "maxGap" : 0,
             "nested" : [ {
-              "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
-              "detector" : "$rule1:THRESHOLD:0"
+              "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
             } ],
+            "detector" : "$rule1:THRESHOLD:0",
             "maxDuration" : 100
           } ]
         } ]
@@ -31,9 +31,9 @@
           "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
           "maxGap" : 0,
           "nested" : [ {
-            "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
-            "detector" : "$rule2:THRESHOLD:0"
+            "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
           } ],
+          "detector" : "$rule2:THRESHOLD:0",
           "maxDuration" : 100
         } ]
       } ],
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
index 2d875a4..bd43106 100644
--- a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
+++ b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
@@ -9,9 +9,9 @@
         "baselineValueProvider" : "$rule1:RULE_BASELINE:0",
         "className" : "com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
         "nested" : [ {
-          "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
-          "detector" : "$rule1:THRESHOLD:0"
-        } ]
+          "className" : "com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
+        } ],
+        "detector" : "$rule1:THRESHOLD:0"
       } ],
       "minContribution" : 0.05,
       "dimensions" : [ "D1", "D2" ]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org