You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2018/12/12 01:00:15 UTC

[incubator-pinot] branch master updated: [TE] yaml - create alert endpoint (#3609)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 80be2cb  [TE] yaml - create alert endpoint (#3609)
80be2cb is described below

commit 80be2cbe55a75d3b5f9fc9f68045b4445c7c7ce5
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Tue Dec 11 17:00:10 2018 -0800

    [TE] yaml - create alert endpoint (#3609)
    
    - The endpoint to set up detection and notification at the same time.
    - Endpoints to get a single yaml
    - Swagger docs
---
 .../java/com/linkedin/thirdeye/api/Constants.java  |   1 +
 .../thirdeye/detection/DetectionResource.java      |  25 +++
 .../thirdeye/detection/yaml/YamlResource.java      | 222 ++++++++++++++-------
 3 files changed, 181 insertions(+), 67 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/Constants.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/Constants.java
index b19913f..170fdbb 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/Constants.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/Constants.java
@@ -27,4 +27,5 @@ public class Constants {
   public static final String DASHBOARD_TAG = "Dashboard";
   public static final String ONBOARD_TAG = "Onboard";
   public static final String YAML_TAG = "Yaml";
+  public static final String DETECTION_TAG = "detection";
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index dd63d06..2647e9f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -18,12 +18,15 @@ package com.linkedin.thirdeye.detection;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
+import com.linkedin.thirdeye.api.Constants;
 import com.linkedin.thirdeye.constant.AnomalyResultSource;
 import com.linkedin.thirdeye.datalayer.bao.DatasetConfigManager;
+import com.linkedin.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.EventManager;
 import com.linkedin.thirdeye.datalayer.bao.MergedAnomalyResultManager;
 import com.linkedin.thirdeye.datalayer.bao.MetricConfigManager;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.datasource.DAORegistry;
@@ -35,6 +38,8 @@ import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
 import com.linkedin.thirdeye.detection.finetune.GridSearchTuningAlgorithm;
 import com.linkedin.thirdeye.detection.finetune.TuningAlgorithm;
 import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
 import com.wordnik.swagger.annotations.ApiParam;
 import java.text.ParseException;
 import java.util.ArrayList;
@@ -46,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -63,6 +69,7 @@ import org.slf4j.LoggerFactory;
 
 @Path("/detection")
 @Produces(MediaType.APPLICATION_JSON)
+@Api(tags = {Constants.DETECTION_TAG})
 public class DetectionResource {
   private static final Logger LOG = LoggerFactory.getLogger(DetectionResource.class);
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -77,6 +84,7 @@ public class DetectionResource {
   private final DetectionPipelineLoader loader;
   private final DataProvider provider;
   private final DetectionConfigManager configDAO;
+  private final DetectionAlertConfigManager detectionAlertConfigDAO;
 
   public DetectionResource() {
     this.metricDAO = DAORegistry.getInstance().getMetricConfigDAO();
@@ -84,6 +92,7 @@ public class DetectionResource {
     this.eventDAO = DAORegistry.getInstance().getEventDAO();
     this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
     this.configDAO = DAORegistry.getInstance().getDetectionConfigManager();
+    this.detectionAlertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager();
 
     TimeSeriesLoader timeseriesLoader =
         new DefaultTimeSeriesLoader(metricDAO, datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache());
@@ -97,6 +106,22 @@ public class DetectionResource {
     this.provider = new DefaultDataProvider(metricDAO, datasetDAO, eventDAO, anomalyDAO, timeseriesLoader, aggregationLoader, loader);
   }
 
+  @Path("/{id}")
+  @GET
+  @ApiOperation("get a detection config with yaml")
+  public Response getDetectionConfig(@ApiParam("the detection config id") @PathParam("id") long id){
+    DetectionConfigDTO config = this.configDAO.findById(id);
+    return Response.ok(config).build();
+  }
+
+  @Path("/notification/{id}")
+  @GET
+  @ApiOperation("get a detection alert config with yaml")
+  public Response getDetectionAlertConfig(@ApiParam("the detection alert config id") @PathParam("id") long id){
+    DetectionAlertConfigDTO config = this.detectionAlertConfigDAO.findById(id);
+    return Response.ok(config).build();
+  }
+
   @POST
   @Path("/preview")
   public Response detectionPreview(
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java
index 109da58..9db98a9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/YamlResource.java
@@ -1,5 +1,6 @@
 package com.linkedin.thirdeye.detection.yaml;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.linkedin.thirdeye.api.Constants;
@@ -22,6 +23,8 @@ import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DefaultDataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipelineLoader;
 import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiImplicitParam;
+import com.wordnik.swagger.annotations.ApiImplicitParams;
 import com.wordnik.swagger.annotations.ApiOperation;
 import com.linkedin.thirdeye.detection.validators.DetectionAlertConfigValidator;
 import com.wordnik.swagger.annotations.ApiParam;
@@ -53,11 +56,11 @@ import org.yaml.snakeyaml.Yaml;
 @Api(tags = {Constants.YAML_TAG})
 public class YamlResource {
   protected static final Logger LOG = LoggerFactory.getLogger(YamlResource.class);
+  private static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
   public static final String PROP_SUBS_GROUP_NAME = "subscriptionGroupName";
   public static final String PROP_DETECTION_NAME = "detectionName";
 
-
   private final DetectionConfigManager detectionConfigDAO;
   private final DetectionAlertConfigManager detectionAlertConfigDAO;
   private final YamlDetectionTranslatorLoader translatorLoader;
@@ -95,6 +98,110 @@ public class YamlResource {
     this.provider = new DefaultDataProvider(metricDAO, datasetDAO, eventDAO, anomalyDAO, timeseriesLoader, aggregationLoader, loader);
   }
 
+  @POST
+  @Path("/create-alert")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("Use yaml to create both notification and detection yaml. ")
+  public Response createYamlAlert(@ApiParam(value =  "a json contains both notification and detection yaml as string")  String payload,
+      @ApiParam("tuning window start time for tunable components") @QueryParam("startTime") long startTime,
+      @ApiParam("tuning window end time for tunable components") @QueryParam("endTime") long endTime) throws Exception{
+    Map<String, String> yamls = OBJECT_MAPPER.readValue(payload, Map.class);
+
+    if (StringUtils.isBlank(payload)){
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "Empty payload")).build();
+    }
+    if (!yamls.containsKey("detection")) {
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "detection yaml is missing")).build();
+    }
+    if (!yamls.containsKey("notification")){
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "notification yaml is missing")).build();
+    }
+
+    // get detection yaml
+    String detectionYaml = yamls.get("detection");
+
+    Map<String, Object> detectionYamlConfig;
+    try {
+      detectionYamlConfig = (Map<String, Object>) this.yaml.load(detectionYaml);
+    } catch (Exception e){
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "detection yaml parsing error, " + e.getMessage())).build();
+    }
+
+    // check if detection config already exists
+    String name = MapUtils.getString(detectionYamlConfig, PROP_DETECTION_NAME);
+    List<DetectionConfigDTO> detectionConfigDTOs = this.detectionConfigDAO.findByPredicate(
+        Predicate.EQ("name", name));
+    if (!detectionConfigDTOs.isEmpty()){
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "detection name already exist: " + name )).build();
+    }
+
+    HashMap<String, String> responseMessage = new HashMap<>();
+    DetectionConfigDTO detectionConfig =
+        buildDetectionConfigFromYaml(startTime, endTime, detectionYamlConfig, null, responseMessage);
+    if (detectionConfig == null) {
+      return Response.status(Response.Status.BAD_REQUEST).entity(responseMessage).build();
+    }
+    detectionConfig.setYaml(detectionYaml);
+    Long detectionConfigId = this.detectionConfigDAO.save(detectionConfig);
+    Preconditions.checkNotNull(detectionConfigId, "Save detection config failed");
+
+    // notification
+    // TODO: Inject detectionConfigId into detection alert config
+    DetectionAlertConfigDTO alertConfig = createDetectionAlertConfig(yamls.get("notification"), responseMessage);
+    if (alertConfig == null) {
+      // revert
+      this.detectionAlertConfigDAO.deleteById(detectionConfigId);
+      return Response.status(Response.Status.BAD_REQUEST).entity(responseMessage).build();
+    }
+    Long detectionAlertConfigId = this.detectionAlertConfigDAO.save(alertConfig);
+    if (detectionAlertConfigId == null){
+      // revert
+      this.detectionAlertConfigDAO.deleteById(detectionConfigId);
+      return Response.serverError().entity(ImmutableMap.of("message", "Save detection alert config failed")).build();
+    }
+    LOG.info("saved detection alert config id {}", detectionAlertConfigId);
+
+    return Response.ok().entity(ImmutableMap.of("detectionConfigId", detectionConfig.getId(), "detectionAlertConfigId", alertConfig.getId())).build();
+  }
+
+  /*
+   * Build the detection config from a yaml.
+   * Returns null if building or validation failed. Error messages stored in responseMessage.
+   */
+  private DetectionConfigDTO buildDetectionConfigFromYaml(long startTime, long endTime, Map<String, Object> yamlConfig,
+      DetectionConfigDTO existingDetectionConfig, Map<String, String> responseMessage) {
+    try{
+      YamlDetectionConfigTranslator translator = this.translatorLoader.from(yamlConfig, this.provider);
+      DetectionConfigDTO detectionConfig = translator.withTrainingWindow(startTime, endTime)
+          .withExistingDetectionConfig(existingDetectionConfig)
+          .generateDetectionConfig();
+      validatePipeline(detectionConfig);
+      return detectionConfig;
+    } catch (InvocationTargetException e){
+      // exception thrown in validate pipeline via reflection
+      LOG.error("Validate pipeline error", e);
+      responseMessage.put("message", e.getCause().getMessage());
+    } catch (Exception e) {
+      LOG.error("yaml translation error", e);
+      responseMessage.put("message", e.getMessage());
+    }
+    return null;
+  }
+
+  /*
+   * Init the pipeline to check if detection pipeline property is valid semantically.
+   */
+  private void validatePipeline(DetectionConfigDTO detectionConfig) throws Exception {
+    Long id = detectionConfig.getId();
+    // swap out id
+    detectionConfig.setId(-1L);
+    // try to load the detection pipeline and init all the components
+    this.loader.from(provider, detectionConfig, 0, 0);
+    // set id back
+    detectionConfig.setId(id);
+  }
+
   /**
    Set up a detection pipeline using a YAML config
    @param payload YAML config string
@@ -110,38 +217,33 @@ public class YamlResource {
       @ApiParam("yaml config") String payload,
       @ApiParam("tuning window start time for tunable components") @QueryParam("startTime") long startTime,
       @ApiParam("tuning window end time for tunable components") @QueryParam("endTime") long endTime) {
-    String errorMessage;
+    if (StringUtils.isBlank(payload)){
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "empty payload")).build();
+    }
+    Map<String, Object> yamlConfig;
     try {
-      Preconditions.checkArgument(StringUtils.isNotBlank(payload), "Empty payload");
-      Map<String, Object> yamlConfig = (Map<String, Object>) this.yaml.load(payload);
-
-      // retrieve id if detection config already exists
-      List<DetectionConfigDTO> detectionConfigDTOs =
-          this.detectionConfigDAO.findByPredicate(Predicate.EQ("name", MapUtils.getString(yamlConfig, PROP_DETECTION_NAME)));
-      DetectionConfigDTO existingDetectionConfig = null;
-      if (!detectionConfigDTOs.isEmpty()) {
-        existingDetectionConfig = detectionConfigDTOs.get(0);
-      }
-
-      YamlDetectionConfigTranslator translator = this.translatorLoader.from(yamlConfig, this.provider);
-      DetectionConfigDTO detectionConfig = translator.withTrainingWindow(startTime, endTime)
-          .withExistingDetectionConfig(existingDetectionConfig)
-          .generateDetectionConfig();
-      detectionConfig.setYaml(payload);
-      validatePipeline(detectionConfig);
-      Long detectionConfigId = this.detectionConfigDAO.save(detectionConfig);
-      Preconditions.checkNotNull(detectionConfigId, "Save detection config failed");
+      yamlConfig = (Map<String, Object>) this.yaml.load(payload);
+    } catch (Exception e){
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "detection yaml parsing error, " + e.getMessage())).build();
+    }
 
-      return Response.ok(detectionConfig).build();
-    } catch (InvocationTargetException e){
-      // exception thrown in validate pipeline via reflection
-      LOG.error("Validate pipeline error", e);
-      errorMessage = e.getCause().getMessage();
-    } catch (Exception e) {
-      LOG.error("yaml translation error", e);
-      errorMessage = e.getMessage();
+    // check if detection config already exists
+    String name = MapUtils.getString(yamlConfig, PROP_DETECTION_NAME);
+    List<DetectionConfigDTO> detectionConfigDTOs = this.detectionConfigDAO.findByPredicate(
+        Predicate.EQ("name", name));
+    if (!detectionConfigDTOs.isEmpty()){
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "detection name already exist: " + name )).build();
+    }
+    Map<String, String> responseMessage = new HashMap<>();
+    DetectionConfigDTO detectionConfig =
+        buildDetectionConfigFromYaml(startTime, endTime, yamlConfig, null, responseMessage);
+    if (detectionConfig == null) {
+      return Response.status(Response.Status.BAD_REQUEST).entity(responseMessage).build();
     }
-    return Response.status(400).entity(ImmutableMap.of("status", "400", "message", errorMessage)).build();
+    detectionConfig.setYaml(payload);
+    Long detectionConfigId = this.detectionConfigDAO.save(detectionConfig);
+    Preconditions.checkNotNull(detectionConfigId, "Save detection config failed");
+    return Response.ok(detectionConfig).build();
   }
 
   /**
@@ -162,33 +264,31 @@ public class YamlResource {
       @ApiParam("the detection config id to edit") @PathParam("id") long id,
       @ApiParam("tuning window start time for tunable components")  @QueryParam("startTime") long startTime,
       @ApiParam("tuning window end time for tunable components") @QueryParam("endTime") long endTime) {
-    String errorMessage;
+    if (StringUtils.isBlank(payload)){
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "empty payload")).build();
+    }
+    Map<String, Object> yamlConfig;
     try {
-      Preconditions.checkArgument(StringUtils.isNotBlank(payload), "Empty payload");
-      Map<String, Object> yamlConfig = (Map<String, Object>) this.yaml.load(payload);
-
-      DetectionConfigDTO existingDetectionConfig = this.detectionConfigDAO.findById(id);
-      Preconditions.checkArgument(existingDetectionConfig != null, "Existing detection config " + id + " not found");
-
-      YamlDetectionConfigTranslator translator = this.translatorLoader.from(yamlConfig, this.provider);
-      DetectionConfigDTO detectionConfig = translator.withTrainingWindow(startTime, endTime)
-          .withExistingDetectionConfig(existingDetectionConfig)
-          .generateDetectionConfig();
-      detectionConfig.setYaml(payload);
-      validatePipeline(detectionConfig);
-      Long detectionConfigId = this.detectionConfigDAO.save(detectionConfig);
-      Preconditions.checkNotNull(detectionConfigId, "Save detection config failed");
+      yamlConfig = (Map<String, Object>) this.yaml.load(payload);
+    } catch (Exception e){
+      return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("message", "detection yaml parsing error, " + e.getMessage())).build();
+    }
 
-      return Response.ok(detectionConfig).build();
-    } catch (InvocationTargetException e){
-      // exception thrown in validate pipeline via reflection
-      LOG.error("Validate pipeline error", e);
-      errorMessage = e.getCause().getMessage();
-    } catch (Exception e) {
-      LOG.error("yaml translation error", e);
-      errorMessage = e.getMessage();
+    Map<String, String> responseMessage = new HashMap<>();
+    // retrieve id if detection config already exists
+    DetectionConfigDTO existingDetectionConfig = this.detectionConfigDAO.findById(id);
+    if (existingDetectionConfig == null) {
+      return Response.status(Response.Status.BAD_REQUEST).entity(responseMessage).build();
+    }
+    DetectionConfigDTO detectionConfig =
+        buildDetectionConfigFromYaml(startTime, endTime, yamlConfig, existingDetectionConfig, responseMessage);
+    if (detectionConfig == null) {
+      return Response.status(Response.Status.BAD_REQUEST).entity(responseMessage).build();
     }
-    return Response.status(400).entity(ImmutableMap.of("status", "400", "message", errorMessage)).build();
+    detectionConfig.setYaml(payload);
+    Long detectionConfigId = this.detectionConfigDAO.save(detectionConfig);
+    Preconditions.checkNotNull(detectionConfigId, "Save detection config failed");
+    return Response.ok(detectionConfig).build();
   }
 
   @POST
@@ -224,18 +324,6 @@ public class YamlResource {
     return Response.ok().entity(responseMessage).build();
   }
 
-  /*
-   * Init the pipeline to check if detection pipeline property is valid semantically.
-   */
-  private void validatePipeline(DetectionConfigDTO detectionConfig) throws Exception {
-    Long id = detectionConfig.getId();
-    // swap out id
-    detectionConfig.setId(-1L);
-    // try to load the detection pipeline and init all the components
-    this.loader.from(provider, detectionConfig, 0, 0);
-    // set id back
-    detectionConfig.setId(id);
-  }
 
   @SuppressWarnings("unchecked")
   public DetectionAlertConfigDTO createDetectionAlertConfig(String yamlAlertConfig, Map<String, String> responseMessage ) {
@@ -339,7 +427,7 @@ public class YamlResource {
   }
 
   /**
-   List all yaml configurations enhanced with detection config id, isActive and createBy information.
+   List all yaml configurations as JSON. enhanced with detection config id, isActive and createBy information.
    @param id id of a specific detection config yaml to list (optional)
    @return the yaml configuration converted in to JSON, with enhanced information from detection config DTO.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org