You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/03/06 22:31:33 UTC
[incubator-pinot] branch master updated: [TE] Option to
retain/force delete anomalies during replay (#3916)
This is an automated email from the ASF dual-hosted git repository.
akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cda36d7 [TE] Option to retain/force delete anomalies during replay (#3916)
cda36d7 is described below
commit cda36d70a782dbe5c376b581d18d481cf49c247e
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Wed Mar 6 14:31:29 2019 -0800
[TE] Option to retain/force delete anomalies during replay (#3916)
---
.../thirdeye/detection/DetectionResource.java | 46 ++++++++++++----------
1 file changed, 26 insertions(+), 20 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
index e3d7de7..209cc8a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
@@ -372,13 +372,12 @@ public class DetectionResource {
DetectionPipeline pipeline = this.loader.from(this.provider, config, monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
DetectionPipelineResult result = pipeline.run();
- // save state
- if (result.getLastTimestamp() > 0) {
+ // Update
+ if (result.getLastTimestamp() > config.getLastTimestamp()) {
config.setLastTimestamp(result.getLastTimestamp());
+ this.configDAO.update(config);
}
- this.configDAO.update(config);
-
for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
this.anomalyDAO.save(anomaly);
@@ -393,41 +392,48 @@ public class DetectionResource {
/**
* Replay for a given time range. Without cron schedule behavior
+ *
+ * @param detectionId detection config id (must exist)
+ * @param start start time in epoch (millis)
+ * @param end end time in epoch (millis)
+ * @param deleteExistingAnomaly (optional, default false) delete existing anomaly or not
*/
@POST
@Path("/replay/{id}")
public Response detectionReplay(
- @PathParam("id") long configId,
+ @PathParam("id") long detectionId,
@QueryParam("start") long start,
- @QueryParam("end") long end) throws Exception {
+ @QueryParam("end") long end,
+ @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean deleteExistingAnomaly) throws Exception {
- DetectionConfigDTO config = this.configDAO.findById(configId);
+ DetectionConfigDTO config = this.configDAO.findById(detectionId);
if (config == null) {
- throw new IllegalArgumentException(String.format("Cannot find config %d", configId));
+ throw new IllegalArgumentException(String.format("Cannot find config %d", detectionId));
}
- // clear existing anomalies
- AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
- Collection<MergedAnomalyResultDTO> existing = this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+ if (deleteExistingAnomaly) {
+ AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+ Collection<MergedAnomalyResultDTO> existing =
+ this.provider.fetchAnomalies(Collections.singleton(slice), detectionId).get(slice);
- List<Long> existingIds = new ArrayList<>();
- for (MergedAnomalyResultDTO anomaly : existing) {
- existingIds.add(anomaly.getId());
- }
+ List<Long> existingIds = new ArrayList<>();
+ for (MergedAnomalyResultDTO anomaly : existing) {
+ existingIds.add(anomaly.getId());
+ }
- this.anomalyDAO.deleteByIds(existingIds);
+ this.anomalyDAO.deleteByIds(existingIds);
+ }
// execute replay
DetectionPipeline pipeline = this.loader.from(this.provider, config, start, end);
DetectionPipelineResult result = pipeline.run();
- // save state
- if (result.getLastTimestamp() > 0) {
+ // Update state
+ if (result.getLastTimestamp() > config.getLastTimestamp()) {
config.setLastTimestamp(result.getLastTimestamp());
+ this.configDAO.update(config);
}
- this.configDAO.update(config);
-
for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
this.anomalyDAO.save(anomaly);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org