You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/03/06 22:31:33 UTC

[incubator-pinot] branch master updated: [TE] Option to retain/force delete anomalies during replay (#3916)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cda36d7  [TE] Option to retain/force delete anomalies during replay (#3916)
cda36d7 is described below

commit cda36d70a782dbe5c376b581d18d481cf49c247e
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Wed Mar 6 14:31:29 2019 -0800

    [TE] Option to retain/force delete anomalies during replay (#3916)
---
 .../thirdeye/detection/DetectionResource.java      | 46 ++++++++++++----------
 1 file changed, 26 insertions(+), 20 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
index e3d7de7..209cc8a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
@@ -372,13 +372,12 @@ public class DetectionResource {
       DetectionPipeline pipeline = this.loader.from(this.provider, config, monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
       DetectionPipelineResult result = pipeline.run();
 
-      // save state
-      if (result.getLastTimestamp() > 0) {
+      // Update
+      if (result.getLastTimestamp() > config.getLastTimestamp()) {
         config.setLastTimestamp(result.getLastTimestamp());
+        this.configDAO.update(config);
       }
 
-      this.configDAO.update(config);
-
       for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
         anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
         this.anomalyDAO.save(anomaly);
@@ -393,41 +392,48 @@ public class DetectionResource {
 
   /**
    * Replay for a given time range. Without cron schedule behavior
+   *
+   * @param detectionId detection config id (must exist)
+   * @param start start time in epoch (millis)
+   * @param end end time in epoch (millis)
+   * @param deleteExistingAnomaly (optional, default false) delete existing anomaly or not
    */
   @POST
   @Path("/replay/{id}")
   public Response detectionReplay(
-      @PathParam("id") long configId,
+      @PathParam("id") long detectionId,
       @QueryParam("start") long start,
-      @QueryParam("end") long end) throws Exception {
+      @QueryParam("end") long end,
+      @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly) throws Exception {
 
-    DetectionConfigDTO config = this.configDAO.findById(configId);
+    DetectionConfigDTO config = this.configDAO.findById(detectionId);
     if (config == null) {
-      throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
+      throw new IllegalArgumentException(String.format("Cannot find config %d", detectionId));
     }
 
-    // clear existing anomalies
-    AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
-    Collection<MergedAnomalyResultDTO> existing = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+    if (deleteExistingAnomaly) {
+      AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+      Collection<MergedAnomalyResultDTO> existing =
+          this.provider.fetchAnomalies(Collections.singleton(slice), detectionId).get(slice);
 
-    List<Long> existingIds = new ArrayList<>();
-    for (MergedAnomalyResultDTO anomaly : existing) {
-      existingIds.add(anomaly.getId());
-    }
+      List<Long> existingIds = new ArrayList<>();
+      for (MergedAnomalyResultDTO anomaly : existing) {
+        existingIds.add(anomaly.getId());
+      }
 
-    this.anomalyDAO.deleteByIds(existingIds);
+      this.anomalyDAO.deleteByIds(existingIds);
+    }
 
     // execute replay
     DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
     DetectionPipelineResult result = pipeline.run();
 
-    // save state
-    if (result.getLastTimestamp() > 0) {
+    // Update state
+    if (result.getLastTimestamp() > config.getLastTimestamp()) {
       config.setLastTimestamp(result.getLastTimestamp());
+      this.configDAO.update(config);
     }
 
-    this.configDAO.update(config);
-
     for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
       anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
       this.anomalyDAO.save(anomaly);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org