You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2019/01/30 21:49:31 UTC
[incubator-pinot] branch master updated: [TE] Polish the migration
endpoint and fix corner cases (#3765)
This is an automated email from the ASF dual-hosted git repository.
akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 37fb02b [TE] Polish the migration endpoint and fix corner cases (#3765)
37fb02b is described below
commit 37fb02bc04254e75dc379266dbc87c869e8356cb
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Wed Jan 30 13:49:26 2019 -0800
[TE] Polish the migration endpoint and fix corner cases (#3765)
---
.../dashboard/ThirdEyeDashboardApplication.java | 2 +-
.../detection/DetectionMigrationResource.java | 164 ++++++++++++++-------
.../yaml/YamlDetectionAlertConfigTranslator.java | 2 +-
.../detection/DetectionMigrationResourceTest.java | 2 +-
4 files changed, 116 insertions(+), 54 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
index b38e7df..bfa7412 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
@@ -174,7 +174,7 @@ public class ThirdEyeDashboardApplication
env.jersey().register(new DataResource(anomalyFunctionFactory, alertFilterFactory));
env.jersey().register(new AnomaliesResource(anomalyFunctionFactory, alertFilterFactory));
env.jersey().register(new DetectionMigrationResource(
- DAO_REGISTRY.getAnomalyFunctionDAO(), DAO_REGISTRY.getAlertConfigDAO(),
+ DAO_REGISTRY.getAnomalyFunctionDAO(), DAO_REGISTRY.getAlertConfigDAO(), DAO_REGISTRY.getMetricConfigDAO(),
DAO_REGISTRY.getDetectionConfigManager(), DAO_REGISTRY.getDetectionAlertConfigManager(),
DAO_REGISTRY.getDatasetConfigDAO(), DAO_REGISTRY.getMergedAnomalyResultDAO()));
env.jersey().register(new OnboardResource(config));
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
index 302f72a..19e91f5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
@@ -19,7 +19,6 @@
package org.apache.pinot.thirdeye.detection;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.ArrayList;
@@ -30,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -39,6 +37,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.xml.bind.ValidationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
import org.apache.pinot.thirdeye.datalayer.bao.AlertConfigManager;
@@ -47,12 +46,14 @@ import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
import org.apache.pinot.thirdeye.datalayer.dto.AlertConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.datalayer.util.Predicate;
import org.apache.pinot.thirdeye.datasource.DAORegistry;
import org.apache.pinot.thirdeye.detection.yaml.YamlDetectionAlertConfigTranslator;
@@ -77,7 +78,6 @@ public class DetectionMigrationResource {
private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
private static final String PROP_WINDOW_SIZE = "windowSize";
private static final String PROP_WINDOW_UNIT = "windowUnit";
- private static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
static final String MIGRATED_TAG = "_thirdeye_migrated";
@@ -87,6 +87,7 @@ public class DetectionMigrationResource {
private final DatasetConfigManager datasetConfigDAO;
private final MergedAnomalyResultManager mergedAnomalyResultDAO;
private final AlertConfigManager alertConfigDAO;
+ private final MetricConfigManager metricConfigDAO;
private final Yaml yaml;
/**
@@ -95,6 +96,7 @@ public class DetectionMigrationResource {
public DetectionMigrationResource(
AnomalyFunctionManager anomalyFunctionDAO,
AlertConfigManager alertConfigDAO,
+ MetricConfigManager metricConfigDAO,
DetectionConfigManager detectionConfigDAO,
DetectionAlertConfigManager detectionAlertConfigDAO,
DatasetConfigManager datasetConfigDAO,
@@ -103,6 +105,7 @@ public class DetectionMigrationResource {
this.detectionConfigDAO = detectionConfigDAO;
this.detectionAlertConfigDAO = detectionAlertConfigDAO;
this.alertConfigDAO = alertConfigDAO;
+ this.metricConfigDAO = metricConfigDAO;
this.datasetConfigDAO = datasetConfigDAO;
this.mergedAnomalyResultDAO = mergedAnomalyResultDAO;
DumperOptions options = new DumperOptions();
@@ -238,8 +241,6 @@ public class DetectionMigrationResource {
private String getTimezone(AnomalyFunctionDTO functionDTO) {
DatasetConfigDTO datasetConfigDTO = this.datasetConfigDAO.findByDataset(functionDTO.getCollection());
- Preconditions.checkNotNull(datasetConfigDTO,
- String.format("Dataset %s cannot be found for anomaly %d", functionDTO.getCollection(), functionDTO.getId()));
return datasetConfigDTO.getTimezone();
}
@@ -291,11 +292,11 @@ public class DetectionMigrationResource {
return detectorYaml;
}
- long migrateLegacyAnomalyFunction(long anomalyFunctionId) {
+ long migrateLegacyAnomalyFunction(long anomalyFunctionId) throws ValidationException {
AnomalyFunctionDTO anomalyFunctionDTO = this.anomalyFunctionDAO.findById(anomalyFunctionId);
- if (anomalyFunctionDTO == null) {
- throw new RuntimeException(String.format("Couldn't find anomaly function with id %d", anomalyFunctionId));
- }
+
+ // Verify if function is still valid
+ validateFunction(anomalyFunctionDTO);
return migrateLegacyAnomalyFunction(anomalyFunctionDTO);
}
@@ -333,6 +334,10 @@ public class DetectionMigrationResource {
// Point all the associated anomalies to the migrated anomaly function.
List<MergedAnomalyResultDTO> mergedAnomalyResultDTOS = mergedAnomalyResultDAO.findByPredicate(Predicate.EQ("functionId", anomalyFunctionDTO.getId()));
for (MergedAnomalyResultDTO anomaly : mergedAnomalyResultDTOS) {
+ // Drop the baseline and current values from the anomalies.
+ if (anomaly.getProperties() != null) {
+ anomaly.getProperties().remove("anomalyTimelinesView");
+ }
anomaly.setDetectionConfigId(detectionConfig.getId());
int affectedRows = mergedAnomalyResultDAO.update(anomaly);
if (affectedRows == 0) {
@@ -351,6 +356,8 @@ public class DetectionMigrationResource {
+ detectionConfig.getId());
}
} catch (Exception e) {
+ LOGGER.error("[MIG] Failed to migrate anomaly function id {} name {}.", anomalyFunctionDTO.getId(),
+ anomalyFunctionDTO.getFunctionName(), e);
throw new RuntimeException(String.format("Error migrating anomaly function %d due to %s",
anomalyFunctionDTO.getId(), e.getMessage()), e);
}
@@ -360,6 +367,85 @@ public class DetectionMigrationResource {
return detectionConfig.getId();
}
+ private void migrateLegacyNotification(AlertConfigDTO alertConfigDTO) {
+ String alertName = alertConfigDTO.getName();
+
+ try {
+ LOGGER.info(String.format("[MIG] Migrating alert %d %s", alertConfigDTO.getId(), alertName));
+
+ // Skip if the alert is already migrated
+ if (alertConfigDTO.getName().contains(MIGRATED_TAG)) {
+ LOGGER.info(String.format("[MIG] Alert %d is already migrated. Skipping!", alertConfigDTO.getId()));
+ return;
+ }
+
+ // Migrate all the subscribed anomaly functions. Note that this will update the state of old anomaly functions.
+ List<Long> detectionIds = ConfigUtils.getLongs(alertConfigDTO.getEmailConfig().getFunctionIds());
+ List<Long> filteredIds = new ArrayList<>();
+ for (long detectionId : detectionIds) {
+ try {
+ migrateLegacyAnomalyFunction(detectionId);
+ filteredIds.add(detectionId);
+ } catch (ValidationException e) {
+ // Ignore those anomaly functions which are pointing to invalid entities
+ LOGGER.warn("[MIG] Error while migrating anomaly function. Error " + e.getMessage());
+ }
+ }
+ alertConfigDTO.getEmailConfig().setFunctionIds(filteredIds);
+
+ // Translate the old alert and capture the state.
+ Map<String, Object> detectionAlertYaml = translateAlertToYaml(alertConfigDTO);
+
+ // Migrate the alert/notification group
+ DetectionAlertConfigDTO alertConfig = new YamlDetectionAlertConfigTranslator(detectionConfigDAO).translate(detectionAlertYaml);
+ List<DetectionAlertConfigDTO> alertDTOs = detectionAlertConfigDAO.findByPredicate(Predicate.EQ("name", alertConfig.getName()));
+ if (!alertDTOs.isEmpty()) {
+ LOGGER.warn("[MIG] Looks like this alert was already migrated. old id = " + alertConfig.getId() + " new id = "
+ + alertDTOs.get(0).getId());
+ } else {
+ detectionAlertConfigDAO.save(alertConfig);
+ if (alertConfig.getId() == null) {
+ throw new RuntimeException("Error while saving the migrated alert config for " + alertName);
+ }
+ }
+
+ // Update migration status and disable the old alert
+ alertConfigDTO.setName(alertName + MIGRATED_TAG + "_" + alertConfig.getId());
+ alertConfigDTO.setActive(false);
+ int affectedRows = alertConfigDAO.update(alertConfigDTO);
+ if (affectedRows == 0) {
+ throw new RuntimeException(
+ "Alert migrated successfully but failed to disable and update the migration status" + " of the old alert."
+ + " Migrated alert id " + alertConfig.getId());
+ }
+
+ } catch (Exception e) {
+ LOGGER.error("[MIG] Failed to migrate alert ID {} name {}.", alertConfigDTO.getId(), alertConfigDTO.getName(), e);
+ throw new RuntimeException(String.format("Error migrating alert ID %d due to %s",
+ alertConfigDTO.getId(), e.getMessage()), e);
+ }
+
+ LOGGER.info(String.format("[MIG] Successfully migrated alert %d %s", alertConfigDTO.getId(), alertName));
+ }
+
+ private void validateFunction(AnomalyFunctionDTO functionDTO) throws ValidationException {
+ if (functionDTO == null) {
+ throw new ValidationException("Couldn't find anomaly function.");
+ }
+
+ List<DatasetConfigDTO> datasetConfigDTOs = this.datasetConfigDAO.findByPredicate(Predicate.EQ("dataset", functionDTO.getCollection()));
+ if (datasetConfigDTOs.isEmpty()) {
+ throw new ValidationException("Dataset cannot be found for anomaly function " + functionDTO.getId());
+ }
+
+ List<MetricConfigDTO> metricConfigDTOs = this.metricConfigDAO.findByPredicate(Predicate.AND(
+ Predicate.EQ("name", functionDTO.getMetric()),
+ Predicate.EQ("dataset", functionDTO.getCollection())));
+ if (metricConfigDTOs.isEmpty()) {
+ throw new ValidationException("Metric cannot be found for anomaly function " + functionDTO.getId());
+ }
+ }
+
Map<String, Object> translateAlertToYaml(AlertConfigDTO alertConfigDTO) {
Map<String, Object> yamlConfigs = new LinkedHashMap<>();
@@ -386,11 +472,19 @@ public class DetectionMigrationResource {
List<String> detectionNames = new ArrayList<>();
List<Long> detectionIds = alertConfigDTO.getEmailConfig().getFunctionIds();
- try {
- detectionNames.addAll(detectionIds.stream().map(detectionId -> this.anomalyFunctionDAO.findByPredicate(
- Predicate.EQ("baseId", detectionId)).get(0).getFunctionName()).collect(Collectors.toList()));
- } catch (Exception e){
- throw new IllegalArgumentException("cannot find subscribed detection id, please check the function ids");
+ for (Long id : detectionIds) {
+ List<AnomalyFunctionDTO> functionDTOS = this.anomalyFunctionDAO.findByPredicate(Predicate.EQ("baseId", id));
+ if (functionDTOS.isEmpty()) {
+ LOGGER.warn("[MIG] Anomaly function " + id + " cannot be found while generating notification YAML from legacy notification.");
+ // Ignore missing anomaly functions
+ continue;
+ }
+
+ String functionName = functionDTOS.get(0).getFunctionName();
+ if (functionName.contains(MIGRATED_TAG)) {
+ functionName = functionName.substring(0, functionName.lastIndexOf(MIGRATED_TAG));
+ }
+ detectionNames.add(functionName);
}
yamlConfigs.put(PROP_DETECTION_NAMES, detectionNames);
@@ -434,47 +528,15 @@ public class DetectionMigrationResource {
Map<String, String> responseMessage = new HashMap<>();
for (AlertConfigDTO alertConfigDTO : alertConfigDTOList) {
- String alertName = alertConfigDTO.getName();
- LOGGER.info(String.format("[MIG] Migrating alert %d %s", alertConfigDTO.getId(), alertName));
try {
- // Skip if the alert is already migrated
- if (alertConfigDTO.getName().contains(MIGRATED_TAG)) {
- LOGGER.info(String.format("[MIG] Alert %d is already migrated. Skipping!", alertConfigDTO.getId()));
- continue;
- }
-
- // Translate the old alert and capture the state
- Map<String, Object> detectionAlertYaml = translateAlertToYaml(alertConfigDTO);
-
- // Migrate all the subscribed anomaly functions. Note that this will update the state of old anomaly functions.
- List<Long> detectionIds = ConfigUtils.getLongs(alertConfigDTO.getEmailConfig().getFunctionIds());
- for (long detectionId : detectionIds) {
- migrateLegacyAnomalyFunction(detectionId);
- }
-
- // Migrate the alert/notification group
- DetectionAlertConfigDTO alertConfig = new YamlDetectionAlertConfigTranslator(detectionConfigDAO).translate(detectionAlertYaml);
- detectionAlertConfigDAO.save(alertConfig);
- if (alertConfig.getId() == null) {
- throw new RuntimeException("Error while saving the migrated alert config for " + alertName);
- }
-
- // Update migration status and disable the old alert
- alertConfigDTO.setName(alertName + MIGRATED_TAG + "_" + alertConfig.getId());
- alertConfigDTO.setActive(false);
- int affectedRows = alertConfigDAO.update(alertConfigDTO);
- if (affectedRows == 0) {
- throw new RuntimeException("Alert migrated successfully but failed to disable and update the migration status"
- + " of the old alert. Recommend doing it manually." + " Migrated alert id " + alertConfig.getId());
- }
- LOGGER.info(String.format("[MIG] Successfully migrated alert %d %s", alertConfigDTO.getId(), alertName));
+ migrateLegacyNotification(alertConfigDTO);
} catch (Exception e) {
- LOGGER.error("[MIG] Failed to migrate alert ID {} name {}.", alertConfigDTO.getId(), alertName, e);
- responseMessage.put("Status of alert " + alertConfigDTO.getId(),
- String.format("Failed to migrate alert ID %d with name %s due to %s", alertConfigDTO.getId(), alertName,
- e.getMessage()));
// Skip migrating this alert and move on to the next
+ responseMessage.put("Status of alert " + alertConfigDTO.getId(),
+ String.format("Failed to migrate alert ID %d with name %s due to %s", alertConfigDTO.getId(),
+ alertConfigDTO.getName(), e.getMessage()));
}
+
}
if (responseMessage.isEmpty()) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
index b64b8ee..2ed1a9a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
@@ -199,7 +199,7 @@ public class YamlDetectionAlertConfigTranslator {
detectionConfigIds.addAll(detectionNames.stream().map(detectionName -> this.detectionConfigDAO.findByPredicate(
Predicate.EQ("name", detectionName)).get(0).getId()).collect(Collectors.toList()));
} catch (Exception e){
- throw new IllegalArgumentException("cannot find detection pipeline, please check the subscribed detections.");
+ throw new IllegalArgumentException("Cannot find detection pipeline, please check the subscribed detections.");
}
alertConfigDTO.setProperties(buildAlerterProperties(yamlAlertConfig, detectionConfigIds));
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResourceTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResourceTest.java
index 63327f9..85ea55e 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResourceTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResourceTest.java
@@ -94,7 +94,7 @@ public class DetectionMigrationResourceTest {
this.applicationDAO = DAORegistry.getInstance().getApplicationDAO();
migrationResource = new DetectionMigrationResource(
- anomalyFunctionDAO, alertConfigDAO, detectionConfigDAO, detectionAlertConfigDAO, datasetDAO, anomalyDAO);
+ anomalyFunctionDAO, alertConfigDAO, metricDAO, detectionConfigDAO, detectionAlertConfigDAO, datasetDAO, anomalyDAO);
DetectionRegistry.registerYamlConvertor(CompositePipelineConfigTranslator.class.getName(), "COMPOSITE");
DetectionRegistry.registerComponent(PercentageChangeRuleDetector.class.getName(), "PERCENTAGE_RULE");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org