You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/05 04:25:02 UTC

incubator-eagle git commit: [MINOR] support group by in siddhi pattern match

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 3a84a2c62 -> e5e215e0b


[MINOR] support group by in siddhi pattern match

Author: wujinhu <wu...@126.com>

Closes #708 from wujinhu/EAGLE-792.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e5e215e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e5e215e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e5e215e0

Branch: refs/heads/master
Commit: e5e215e0bc0d4829e120069cab92922e0fef5f5a
Parents: 3a84a2c
Author: wujinhu <wu...@126.com>
Authored: Mon Dec 5 12:24:55 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Mon Dec 5 12:24:55 2016 +0800

----------------------------------------------------------------------
 .../interpreter/PolicyExecutionPlannerImpl.java | 35 +++++++++++++++++---
 .../interpreter/PolicyInterpreterTest.java      | 34 +++++++++++++++++++
 2 files changed, 64 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e5e215e0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
index 82bb64f..1f46298 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
@@ -32,10 +32,8 @@ import org.wso2.siddhi.query.api.execution.ExecutionElement;
 import org.wso2.siddhi.query.api.execution.query.Query;
 import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
 import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
-import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.JoinInputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
-import org.wso2.siddhi.query.api.execution.query.input.stream.StateInputStream;
+import org.wso2.siddhi.query.api.execution.query.input.state.*;
+import org.wso2.siddhi.query.api.execution.query.input.stream.*;
 import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
 import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
 import org.wso2.siddhi.query.api.execution.query.selection.Selector;
@@ -61,6 +59,7 @@ class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
 
     private final String executionPlan;
     private final Map<String,List<StreamColumn>> effectiveInputStreams;
+    private final Map<String, String> effectiveInputStreamsAlias;
     private final Map<String,List<StreamColumn>> effectiveOutputStreams;
     private final Map<String,StreamPartition> effectivePartitions;
     private final PolicyExecutionPlan policyExecutionPlan;
@@ -68,6 +67,7 @@ class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
     public PolicyExecutionPlannerImpl(String executionPlan) throws Exception {
         this.executionPlan = executionPlan;
         this.effectiveInputStreams = new HashMap<>();
+        this.effectiveInputStreamsAlias = new HashMap<>();
         this.effectiveOutputStreams = new HashMap<>();
         this.effectivePartitions = new HashMap<>();
         this.policyExecutionPlan = doParse();
@@ -173,6 +173,9 @@ class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
                                 for (String streamId : inputStream.getUniqueStreamIds()) {
                                     streamGroupBy.put(streamId, new ArrayList<>());
                                 }
+
+                                collectStreamReferenceIdMapping(((StateInputStream)inputStream).getStateElement());
+
                                 for (Variable variable : groupBy) {
                                     // Not stream not set, then should be all streams' same field
                                     if (variable.getStreamId() == null) {
@@ -180,7 +183,12 @@ class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
                                             streamGroupBy.get(streamId).add(variable);
                                         }
                                     } else {
-                                        String streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping);
+                                        String streamId = variable.getStreamId();
+                                        if (!this.effectiveInputStreamsAlias.containsKey(streamId)) {
+                                            streamId = retrieveStreamId(variable, effectiveInputStreams,queryLevelAliasToStreamMapping);
+                                        } else {
+                                            streamId = this.effectiveInputStreamsAlias.get(streamId);
+                                        }
                                         if (streamGroupBy.containsKey(streamId)) {
                                             streamGroupBy.get(streamId).add(variable);
                                         } else {
@@ -228,6 +236,23 @@ class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
         return policyExecutionPlan;
     }
 
+    private void collectStreamReferenceIdMapping(StateElement stateElement) {
+        if (stateElement instanceof LogicalStateElement) {
+            collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement1());
+            collectStreamReferenceIdMapping(((LogicalStateElement) stateElement).getStreamStateElement2());
+        } else if (stateElement instanceof CountStateElement) {
+            collectStreamReferenceIdMapping(((CountStateElement) stateElement).getStreamStateElement());
+        } else if (stateElement instanceof EveryStateElement) {
+            collectStreamReferenceIdMapping(((EveryStateElement) stateElement).getStateElement());
+        } else if (stateElement instanceof NextStateElement) {
+            collectStreamReferenceIdMapping(((NextStateElement) stateElement).getStateElement());
+            collectStreamReferenceIdMapping(((NextStateElement) stateElement).getNextStateElement());
+        } else if (stateElement instanceof StreamStateElement) {
+            BasicSingleInputStream basicSingleInputStream = ((StreamStateElement) stateElement).getBasicSingleInputStream();
+            this.effectiveInputStreamsAlias.put(basicSingleInputStream.getStreamReferenceId(), basicSingleInputStream.getStreamId());
+        }
+    }
+
     private String retrieveStreamId(Variable variable, Map<String, List<StreamColumn>> streamMap, Map<String, SingleInputStream> aliasMap) {
         Preconditions.checkNotNull(variable.getStreamId(), "streamId");
         if (streamMap.containsKey(variable.getStreamId()) && aliasMap.containsKey(variable.getStreamId())) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e5e215e0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
index a10538e..f68a295 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
@@ -411,4 +411,38 @@ public class PolicyInterpreterTest {
         streamDefinition.setColumns(columns);
         return streamDefinition;
     }
+
+    @Test
+    public void testValidPolicyWithPattern() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        String policy =
+                "from every a = HADOOP_JMX_METRIC_STREAM_1[component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] " +
+                        "-> b = HADOOP_JMX_METRIC_STREAM_1[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " +
+                        "select b.metric, b.host as host, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site " +
+                        "group by b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;";
+        definition.setValue(policy);
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+        Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+        Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType());
+        Assert.assertArrayEquals(new String[]{"metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
+        Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
+    }
 }
\ No newline at end of file