You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/01/30 21:49:31 UTC

[incubator-pinot] branch master updated: [TE] Polish the migration endpoint and fix corner cases (#3765)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 37fb02b  [TE] Polish the migration endpoint and fix corner cases (#3765)
37fb02b is described below

commit 37fb02bc04254e75dc379266dbc87c869e8356cb
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Wed Jan 30 13:49:26 2019 -0800

    [TE] Polish the migration endpoint and fix corner cases (#3765)
---
 .../dashboard/ThirdEyeDashboardApplication.java    |   2 +-
 .../detection/DetectionMigrationResource.java      | 164 ++++++++++++++-------
 .../yaml/YamlDetectionAlertConfigTranslator.java   |   2 +-
 .../detection/DetectionMigrationResourceTest.java  |   2 +-
 4 files changed, 116 insertions(+), 54 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
index b38e7df..bfa7412 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
@@ -174,7 +174,7 @@ public class ThirdEyeDashboardApplication
     env.jersey().register(new DataResource(anomalyFunctionFactory, alertFilterFactory));
     env.jersey().register(new AnomaliesResource(anomalyFunctionFactory, alertFilterFactory));
     env.jersey().register(new DetectionMigrationResource(
-        DAO_REGISTRY.getAnomalyFunctionDAO(), DAO_REGISTRY.getAlertConfigDAO(),
+        DAO_REGISTRY.getAnomalyFunctionDAO(), DAO_REGISTRY.getAlertConfigDAO(), DAO_REGISTRY.getMetricConfigDAO(),
         DAO_REGISTRY.getDetectionConfigManager(), DAO_REGISTRY.getDetectionAlertConfigManager(),
         DAO_REGISTRY.getDatasetConfigDAO(), DAO_REGISTRY.getMergedAnomalyResultDAO()));
     env.jersey().register(new OnboardResource(config));
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
index 302f72a..19e91f5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
@@ -19,7 +19,6 @@
 
 package org.apache.pinot.thirdeye.detection;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -39,6 +37,7 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.xml.bind.ValidationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
 import org.apache.pinot.thirdeye.datalayer.bao.AlertConfigManager;
@@ -47,12 +46,14 @@ import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
 import org.apache.pinot.thirdeye.datalayer.dto.AlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
 import org.apache.pinot.thirdeye.detection.yaml.YamlDetectionAlertConfigTranslator;
@@ -77,7 +78,6 @@ public class DetectionMigrationResource {
   private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
   private static final String PROP_WINDOW_SIZE = "windowSize";
   private static final String PROP_WINDOW_UNIT = "windowUnit";
-  private static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
 
   static final String MIGRATED_TAG = "_thirdeye_migrated";
 
@@ -87,6 +87,7 @@ public class DetectionMigrationResource {
   private final DatasetConfigManager datasetConfigDAO;
   private final MergedAnomalyResultManager mergedAnomalyResultDAO;
   private final AlertConfigManager alertConfigDAO;
+  private final MetricConfigManager metricConfigDAO;
   private final Yaml yaml;
 
   /**
@@ -95,6 +96,7 @@ public class DetectionMigrationResource {
   public DetectionMigrationResource(
       AnomalyFunctionManager anomalyFunctionDAO,
       AlertConfigManager alertConfigDAO,
+      MetricConfigManager metricConfigDAO,
       DetectionConfigManager detectionConfigDAO,
       DetectionAlertConfigManager detectionAlertConfigDAO,
       DatasetConfigManager datasetConfigDAO,
@@ -103,6 +105,7 @@ public class DetectionMigrationResource {
     this.detectionConfigDAO = detectionConfigDAO;
     this.detectionAlertConfigDAO = detectionAlertConfigDAO;
     this.alertConfigDAO = alertConfigDAO;
+    this.metricConfigDAO = metricConfigDAO;
     this.datasetConfigDAO = datasetConfigDAO;
     this.mergedAnomalyResultDAO = mergedAnomalyResultDAO;
     DumperOptions options = new DumperOptions();
@@ -238,8 +241,6 @@ public class DetectionMigrationResource {
 
   private String getTimezone(AnomalyFunctionDTO functionDTO) {
     DatasetConfigDTO datasetConfigDTO = this.datasetConfigDAO.findByDataset(functionDTO.getCollection());
-    Preconditions.checkNotNull(datasetConfigDTO,
-        String.format("Dataset %s cannot be found for anomaly %d", functionDTO.getCollection(), functionDTO.getId()));
     return datasetConfigDTO.getTimezone();
   }
 
@@ -291,11 +292,11 @@ public class DetectionMigrationResource {
     return detectorYaml;
   }
 
-  long migrateLegacyAnomalyFunction(long anomalyFunctionId) {
+  long migrateLegacyAnomalyFunction(long anomalyFunctionId) throws ValidationException {
     AnomalyFunctionDTO anomalyFunctionDTO = this.anomalyFunctionDAO.findById(anomalyFunctionId);
-    if (anomalyFunctionDTO == null) {
-      throw new RuntimeException(String.format("Couldn't find anomaly function with id %d", anomalyFunctionId));
-    }
+
+    // Verify if function is still valid
+    validateFunction(anomalyFunctionDTO);
 
     return migrateLegacyAnomalyFunction(anomalyFunctionDTO);
   }
@@ -333,6 +334,10 @@ public class DetectionMigrationResource {
       // Point all the associated anomalies to the migrated anomaly function.
       List<MergedAnomalyResultDTO> mergedAnomalyResultDTOS = mergedAnomalyResultDAO.findByPredicate(Predicate.EQ("functionId", anomalyFunctionDTO.getId()));
       for (MergedAnomalyResultDTO anomaly : mergedAnomalyResultDTOS) {
+        // Drop the baseline and current values from the anomalies.
+        if (anomaly.getProperties() != null) {
+          anomaly.getProperties().remove("anomalyTimelinesView");
+        }
         anomaly.setDetectionConfigId(detectionConfig.getId());
         int affectedRows = mergedAnomalyResultDAO.update(anomaly);
         if (affectedRows == 0) {
@@ -351,6 +356,8 @@ public class DetectionMigrationResource {
             + detectionConfig.getId());
       }
     } catch (Exception e) {
+      LOGGER.error("[MIG] Failed to migrate anomaly function id {} name {}.", anomalyFunctionDTO.getId(),
+          anomalyFunctionDTO.getFunctionName(), e);
       throw new RuntimeException(String.format("Error migrating anomaly function %d due to %s",
           anomalyFunctionDTO.getId(), e.getMessage()), e);
     }
@@ -360,6 +367,85 @@ public class DetectionMigrationResource {
     return detectionConfig.getId();
   }
 
+  private void migrateLegacyNotification(AlertConfigDTO alertConfigDTO) {
+    String alertName = alertConfigDTO.getName();
+
+    try {
+      LOGGER.info(String.format("[MIG] Migrating alert %d %s", alertConfigDTO.getId(), alertName));
+
+      // Skip if the alert is already migrated
+      if (alertConfigDTO.getName().contains(MIGRATED_TAG)) {
+        LOGGER.info(String.format("[MIG] Alert %d is already migrated. Skipping!", alertConfigDTO.getId()));
+        return;
+      }
+
+      // Migrate all the subscribed anomaly functions. Note that this will update the state of old anomaly functions.
+      List<Long> detectionIds = ConfigUtils.getLongs(alertConfigDTO.getEmailConfig().getFunctionIds());
+      List<Long> filteredIds = new ArrayList<>();
+      for (long detectionId : detectionIds) {
+        try {
+          migrateLegacyAnomalyFunction(detectionId);
+          filteredIds.add(detectionId);
+        } catch (ValidationException e) {
+          // Ignore those anomaly functions which are pointing to invalid entities
+          LOGGER.warn("[MIG] Error while migrating anomaly function. Error " + e.getMessage());
+        }
+      }
+      alertConfigDTO.getEmailConfig().setFunctionIds(filteredIds);
+
+      // Translate the old alert and capture the state.
+      Map<String, Object> detectionAlertYaml = translateAlertToYaml(alertConfigDTO);
+
+      // Migrate the alert/notification group
+      DetectionAlertConfigDTO alertConfig = new YamlDetectionAlertConfigTranslator(detectionConfigDAO).translate(detectionAlertYaml);
+      List<DetectionAlertConfigDTO> alertDTOs = detectionAlertConfigDAO.findByPredicate(Predicate.EQ("name", alertConfig.getName()));
+      if (!alertDTOs.isEmpty()) {
+        LOGGER.warn("[MIG] Looks like this alert was already migrated. old id = " + alertConfig.getId() + " new id = "
+            + alertDTOs.get(0).getId());
+      } else {
+        detectionAlertConfigDAO.save(alertConfig);
+        if (alertConfig.getId() == null) {
+          throw new RuntimeException("Error while saving the migrated alert config for " + alertName);
+        }
+      }
+
+      // Update migration status and disable the old alert
+      alertConfigDTO.setName(alertName + MIGRATED_TAG + "_" + alertConfig.getId());
+      alertConfigDTO.setActive(false);
+      int affectedRows = alertConfigDAO.update(alertConfigDTO);
+      if (affectedRows == 0) {
+        throw new RuntimeException(
+            "Alert migrated successfully but failed to disable and update the migration status" + " of the old alert."
+                + " Migrated alert id " + alertConfig.getId());
+      }
+
+    } catch (Exception e) {
+      LOGGER.error("[MIG] Failed to migrate alert ID {} name {}.", alertConfigDTO.getId(), alertConfigDTO.getName(), e);
+      throw new RuntimeException(String.format("Error migrating alert ID %d due to %s",
+          alertConfigDTO.getId(), e.getMessage()), e);
+    }
+
+    LOGGER.info(String.format("[MIG] Successfully migrated alert %d %s", alertConfigDTO.getId(), alertName));
+  }
+
+  private void validateFunction(AnomalyFunctionDTO functionDTO) throws ValidationException {
+    if (functionDTO == null) {
+      throw new ValidationException("Couldn't find anomaly function.");
+    }
+
+    List<DatasetConfigDTO> datasetConfigDTOs = this.datasetConfigDAO.findByPredicate(Predicate.EQ("dataset", functionDTO.getCollection()));
+    if (datasetConfigDTOs.isEmpty()) {
+      throw new ValidationException("Dataset cannot be found for anomaly function " + functionDTO.getId());
+    }
+
+    List<MetricConfigDTO> metricConfigDTOs = this.metricConfigDAO.findByPredicate(Predicate.AND(
+            Predicate.EQ("name", functionDTO.getMetric()),
+            Predicate.EQ("dataset", functionDTO.getCollection())));
+    if (metricConfigDTOs.isEmpty()) {
+      throw new ValidationException("Metric cannot be found for anomaly function " + functionDTO.getId());
+    }
+  }
+
   Map<String, Object> translateAlertToYaml(AlertConfigDTO alertConfigDTO) {
     Map<String, Object> yamlConfigs = new LinkedHashMap<>();
 
@@ -386,11 +472,19 @@ public class DetectionMigrationResource {
 
     List<String> detectionNames = new ArrayList<>();
     List<Long> detectionIds = alertConfigDTO.getEmailConfig().getFunctionIds();
-    try {
-      detectionNames.addAll(detectionIds.stream().map(detectionId ->  this.anomalyFunctionDAO.findByPredicate(
-          Predicate.EQ("baseId", detectionId)).get(0).getFunctionName()).collect(Collectors.toList()));
-    } catch (Exception e){
-      throw new IllegalArgumentException("cannot find subscribed detection id, please check the function ids");
+    for (Long id : detectionIds) {
+      List<AnomalyFunctionDTO> functionDTOS = this.anomalyFunctionDAO.findByPredicate(Predicate.EQ("baseId", id));
+      if (functionDTOS.isEmpty()) {
+        LOGGER.warn("[MIG] Anomaly function " + id + " cannot be found while generating notification YAML from legacy notification.");
+        // Ignore missing anomaly functions
+        continue;
+      }
+
+      String functionName = functionDTOS.get(0).getFunctionName();
+      if (functionName.contains(MIGRATED_TAG)) {
+        functionName = functionName.substring(0, functionName.lastIndexOf(MIGRATED_TAG));
+      }
+      detectionNames.add(functionName);
     }
     yamlConfigs.put(PROP_DETECTION_NAMES, detectionNames);
 
@@ -434,47 +528,15 @@ public class DetectionMigrationResource {
     Map<String, String> responseMessage = new HashMap<>();
 
     for (AlertConfigDTO alertConfigDTO : alertConfigDTOList) {
-      String alertName = alertConfigDTO.getName();
-      LOGGER.info(String.format("[MIG] Migrating alert %d %s", alertConfigDTO.getId(), alertName));
       try {
-        // Skip if the alert is already migrated
-        if (alertConfigDTO.getName().contains(MIGRATED_TAG)) {
-          LOGGER.info(String.format("[MIG] Alert %d is already migrated. Skipping!", alertConfigDTO.getId()));
-          continue;
-        }
-
-        // Translate the old alert and capture the state
-        Map<String, Object> detectionAlertYaml = translateAlertToYaml(alertConfigDTO);
-
-        // Migrate all the subscribed anomaly functions. Note that this will update the state of old anomaly functions.
-        List<Long> detectionIds = ConfigUtils.getLongs(alertConfigDTO.getEmailConfig().getFunctionIds());
-        for (long detectionId : detectionIds) {
-          migrateLegacyAnomalyFunction(detectionId);
-        }
-
-        // Migrate the alert/notification group
-        DetectionAlertConfigDTO alertConfig = new YamlDetectionAlertConfigTranslator(detectionConfigDAO).translate(detectionAlertYaml);
-        detectionAlertConfigDAO.save(alertConfig);
-        if (alertConfig.getId() == null) {
-          throw new RuntimeException("Error while saving the migrated alert config for " + alertName);
-        }
-
-        // Update migration status and disable the old alert
-        alertConfigDTO.setName(alertName + MIGRATED_TAG + "_" + alertConfig.getId());
-        alertConfigDTO.setActive(false);
-        int affectedRows = alertConfigDAO.update(alertConfigDTO);
-        if (affectedRows == 0) {
-          throw new RuntimeException("Alert migrated successfully but failed to disable and update the migration status"
-              + " of the old alert. Recommend doing it manually." + " Migrated alert id " + alertConfig.getId());
-        }
-        LOGGER.info(String.format("[MIG] Successfully migrated alert %d %s", alertConfigDTO.getId(), alertName));
+        migrateLegacyNotification(alertConfigDTO);
       } catch (Exception e) {
-        LOGGER.error("[MIG] Failed to migrate alert ID {} name {}.", alertConfigDTO.getId(), alertName, e);
-        responseMessage.put("Status of alert " + alertConfigDTO.getId(),
-            String.format("Failed to migrate alert ID %d with name %s due to %s", alertConfigDTO.getId(), alertName,
-                e.getMessage()));
         // Skip migrating this alert and move on to the next
+        responseMessage.put("Status of alert " + alertConfigDTO.getId(),
+            String.format("Failed to migrate alert ID %d with name %s due to %s", alertConfigDTO.getId(),
+                alertConfigDTO.getName(), e.getMessage()));
       }
+
     }
 
     if (responseMessage.isEmpty()) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
index b64b8ee..2ed1a9a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
@@ -199,7 +199,7 @@ public class YamlDetectionAlertConfigTranslator {
       detectionConfigIds.addAll(detectionNames.stream().map(detectionName ->  this.detectionConfigDAO.findByPredicate(
           Predicate.EQ("name", detectionName)).get(0).getId()).collect(Collectors.toList()));
     } catch (Exception e){
-      throw new IllegalArgumentException("cannot find detection pipeline, please check the subscribed detections.");
+      throw new IllegalArgumentException("Cannot find detection pipeline, please check the subscribed detections.");
     }
 
     alertConfigDTO.setProperties(buildAlerterProperties(yamlAlertConfig, detectionConfigIds));
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResourceTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResourceTest.java
index 63327f9..85ea55e 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResourceTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResourceTest.java
@@ -94,7 +94,7 @@ public class DetectionMigrationResourceTest {
     this.applicationDAO = DAORegistry.getInstance().getApplicationDAO();
 
     migrationResource = new DetectionMigrationResource(
-        anomalyFunctionDAO, alertConfigDAO, detectionConfigDAO, detectionAlertConfigDAO, datasetDAO, anomalyDAO);
+        anomalyFunctionDAO, alertConfigDAO, metricDAO, detectionConfigDAO, detectionAlertConfigDAO, datasetDAO, anomalyDAO);
 
     DetectionRegistry.registerYamlConvertor(CompositePipelineConfigTranslator.class.getName(), "COMPOSITE");
     DetectionRegistry.registerComponent(PercentageChangeRuleDetector.class.getName(), "PERCENTAGE_RULE");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org