You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/11/08 12:06:54 UTC

incubator-eagle git commit: EAGLE-746 : PolicyValidation is insufficient, and will cause blocking for extended handler

Repository: incubator-eagle
Updated Branches:
  refs/heads/master a5537c0c2 -> cb3bf7b60


EAGLE-746 : PolicyValidation is insufficient, and will cause blocking for extended handler

Author: ralphsu

This closes #625


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/cb3bf7b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/cb3bf7b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/cb3bf7b6

Branch: refs/heads/master
Commit: cb3bf7b6063adff6ef566650b7a438dfe4b52290
Parents: a5537c0
Author: Ralph, Su <su...@gmail.com>
Authored: Tue Nov 8 19:45:49 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Tue Nov 8 20:08:45 2016 +0800

----------------------------------------------------------------------
 .../engine/interpreter/PolicyInterpreter.java   |  16 +-
 .../interpreter/PolicyInterpreterTest.java      |  39 +-
 .../src/test/resources/interpreter/policy.json  |  36 ++
 .../src/test/resources/interpreter/streams.json | 524 +++++++++++++++++++
 4 files changed, 606 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cb3bf7b6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
index 8aec294..4add3ff 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreter.java
@@ -20,6 +20,7 @@ import com.google.common.base.Preconditions;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
 import org.apache.eagle.alert.engine.evaluator.impl.SiddhiDefinitionAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,12 +89,15 @@ public class PolicyInterpreter {
                 }
             }
 
-            PolicyExecutionPlan policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
-            // Validate output
-            if (policy.getOutputStreams() != null) {
-                for (String outputStream : policy.getOutputStreams()) {
-                    if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
-                        throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
+            PolicyExecutionPlan policyExecutionPlan = null;
+            if (PolicyStreamHandlers.SIDDHI_ENGINE.equalsIgnoreCase(policy.getDefinition().getType())) {
+                policyExecutionPlan = parseExecutionPlan(policy.getDefinition().getValue(), inputDefinitions);
+                // Validate output
+                if (policy.getOutputStreams() != null) {
+                    for (String outputStream : policy.getOutputStreams()) {
+                        if (!policyExecutionPlan.getOutputStreams().containsKey(outputStream)) {
+                            throw new StreamNotDefinedException("Output stream " + outputStream + " not defined");
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cb3bf7b6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
index cf1a7e9..a10538e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
@@ -16,13 +16,14 @@
  */
 package org.apache.eagle.alert.engine.interpreter;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.interpreter.PolicyExecutionPlan;
-import org.apache.eagle.alert.engine.interpreter.PolicyInterpreter;
-import org.apache.eagle.alert.engine.interpreter.PolicyValidationResult;
+import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
 import org.junit.Assert;
 import org.junit.Test;
 import org.wso2.siddhi.core.exception.DefinitionNotExistException;
@@ -364,6 +365,38 @@ public class PolicyInterpreterTest {
         );
     }
 
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    @Test
+    public void testLeftJoin() throws  Exception {
+        PolicyDefinition def = mapper.readValue(PolicyInterpreterTest.class.getResourceAsStream("/interpreter/policy.json"), PolicyDefinition.class);
+        ArrayNode array = (ArrayNode)mapper.readTree(PolicyInterpreterTest.class.getResourceAsStream("/interpreter/streams.json"));
+        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
+        for(JsonNode node : array) {
+            StreamDefinition streamDef = mapper.readValue(node.toString(), StreamDefinition.class);
+            allDefinitions.put(streamDef.getStreamId(), streamDef);
+        }
+        PolicyValidationResult result = PolicyInterpreter.validate(def, allDefinitions);
+        Assert.assertTrue(result.isSuccess());
+    }
+
+    @Test
+    public void testExtendPolicy() throws  Exception {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test-extend-policy");
+        policyDefinition.setInputStreams(Collections.singletonList("INPUT_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("OUTPUT_STREAM_1"));
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
+        policyDefinition.setDefinition(definition);
+
+        Map<String, StreamDefinition> allDefinitions = new HashMap<>();
+        allDefinitions.put("INPUT_STREAM_1", mockStreamDefinition("INPUT_STREAM_1"));
+        PolicyValidationResult result = PolicyInterpreter.validate(policyDefinition, allDefinitions);
+        Assert.assertTrue(result.isSuccess());
+    }
+
+
     // --------------
     // Helper Methods
     // --------------

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cb3bf7b6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/interpreter/policy.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/interpreter/policy.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/interpreter/policy.json
new file mode 100644
index 0000000..72eabad
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/interpreter/policy.json
@@ -0,0 +1,36 @@
+{
+  "name": "network_symptomatic_join",
+  "description": "Set parentKey of child alerts as docId of device down alert",
+  "inputStreams": [
+    "networkCorrelateIntermediumStream",
+    "deviceDownAlertStream"
+  ],
+  "outputStreams": [
+    "symptomaticAlertOutputStream"
+  ],
+  "definition": {
+    "type": "siddhi",
+    "value": "from networkCorrelateIntermediumStream#window.time(10 min) as left join deviceDownAlertStream[state=='open']#window.time(10 min) as right on left.linkedSwitch == right.entity select left.name, left.pod, left.docId, left.cause, left.firstOccurrenceTime, left.bubble, left.timestamp, left.severity, left.componentType, left.linkedSwitch, left.alertSource, left.entityType, left.ip, left.alertKey, left.message, 'symptomaticAlertOutputStream' as streamName, left.component, left.linkedPort, left.namespace, left.entity, left.dc, left.status, left.endTime, left.lastModifiedTime, right.docId as parentKey insert into symptomaticAlertOutputStream;",
+    "properties": {},
+    "inputStreams": [],
+    "outputStreams": []
+  },
+  "policyStatus": "ENABLED",
+  "partitionSpec": [
+    {
+      "streamId": "networkCorrelateIntermediumStream",
+      "type": "GROUPBY",
+      "columns": [
+        "linkedSwitch"
+      ]
+    },
+    {
+      "streamId": "deviceDownAlertStream",
+      "type": "GROUPBY",
+      "columns": [
+        "entity"
+      ]
+    }
+  ],
+  "parallelismHint": 0
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/cb3bf7b6/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/interpreter/streams.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/interpreter/streams.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/interpreter/streams.json
new file mode 100644
index 0000000..ed7c78f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/interpreter/streams.json
@@ -0,0 +1,524 @@
+[
+  {
+    "streamId": "deviceDownAlertStream",
+    "description": "Stream definition for symptomatic parent alerts",
+    "validate": false,
+    "timeseries": false,
+    "dataSource": "network_aggregate_alert_output_datasource",
+    "columns": [
+      {
+        "name": "docId",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "the docId for correlated alerts"
+      },
+      {
+        "name": "parentKey",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "the parent key for correlated alert"
+      },
+      {
+        "name": "state",
+        "type": "STRING",
+        "defaultValue": "closed",
+        "required": true,
+        "description": "the alert state"
+      },
+      {
+        "name": "name",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "namespace",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "timestamp",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "lastModifiedTime",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "severity",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "severity for symptomatic alert"
+      },
+      {
+        "name": "severityCode",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "severity code for symptomatic alert"
+      },
+      {
+        "name": "streamName",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "entity",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "pod",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "bubble",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "dc",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "cause",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "message",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "count",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": false
+      },
+      {
+        "name": "firstOccurrenceTime",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": false
+      },
+      {
+        "name": "alertKey",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      }
+    ]
+  },
+  {
+    "streamId": "networkCorrelateIntermediumStream",
+    "description": "The template aggregated *OUTPUT* data stream schema for network correlation intermedium step",
+    "validate": false,
+    "timeseries": false,
+    "dataSource": "network_aggregate_alert_output_datasource",
+    "columns": [
+      {
+        "name": "docId",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "the docId for aggregated alerts"
+      },
+      {
+        "name": "firstOccurrenceTime",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": "firstOccurTime for aggregated alert"
+      },
+      {
+        "name": "endTime",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": "the end time of aggregated alert, 0 as default"
+      },
+      {
+        "name": "alertKey",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "the alert key for aggregated alert"
+      },
+      {
+        "name": "parentKey",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "the parent key for aggregated alert"
+      },
+      {
+        "name": "state",
+        "type": "STRING",
+        "defaultValue": "closed",
+        "required": true,
+        "description": "the alert state"
+      },
+      {
+        "name": "name",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "namespace",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "timestamp",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "lastModifiedTime",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "severity",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "severity for aggregated alert"
+      },
+      {
+        "name": "alertSource",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "streamName",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "entity",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "entityType",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "ip",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "pod",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "bubble",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "dc",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "component",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "componentType",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "status",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "cause",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "message",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "linkedPort",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "linkedSwitch",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "severityCode",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false,
+        "description": ""
+      },
+      {
+        "name": "count",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": false,
+        "description": ""
+      }
+    ]
+  },
+  {
+    "streamId": "symptomaticAlertOutputStream",
+    "description": "Stream definition for symptomatic child alerts",
+    "validate": false,
+    "timeseries": false,
+    "dataSource": "network_aggregate_alert_output_datasource",
+    "columns": [
+      {
+        "name": "docId",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "the docId for symptomatic alerts"
+      },
+      {
+        "name": "firstOccurrenceTime",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": "firstOccurTime for symptomatic alert"
+      },
+      {
+        "name": "endTime",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": "the end time of symptomatic alert, 0 as default"
+      },
+      {
+        "name": "alertKey",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "the alert key for symptomatic alert"
+      },
+      {
+        "name": "parentKey",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "the parent key for symptomatic alert"
+      },
+      {
+        "name": "state",
+        "type": "STRING",
+        "defaultValue": "closed",
+        "required": true,
+        "description": "the alert state"
+      },
+      {
+        "name": "name",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "namespace",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "timestamp",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "lastModifiedTime",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "severity",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "severity for symptomatic alert"
+      },
+      {
+        "name": "severityCode",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": "severity code for symptomatic alert"
+      },
+      {
+        "name": "streamName",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "entity",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "entityType",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "ip",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "pod",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "bubble",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "dc",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "component",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "componentType",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "status",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "alertSource",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": false
+      },
+      {
+        "name": "cause",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "message",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "linkedPort",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "linkedSwitch",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true,
+        "description": ""
+      },
+      {
+        "name": "count",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true,
+        "description": ""
+      }
+    ]
+  }
+]
\ No newline at end of file