You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/02/27 10:34:01 UTC

eagle git commit: [MINOR] support sequence in policy definition

Repository: eagle
Updated Branches:
  refs/heads/master a42f7891c -> af33ae3c3


[MINOR] support sequence in policy definition

Author: wujinhu <wu...@126.com>

Closes #844 from wujinhu/EAGLE-847.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/af33ae3c
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/af33ae3c
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/af33ae3c

Branch: refs/heads/master
Commit: af33ae3c364aa3edf57df5469c97ab28cf0ce3c1
Parents: a42f789
Author: wujinhu <wu...@126.com>
Authored: Mon Feb 27 18:33:52 2017 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Mon Feb 27 18:33:52 2017 +0800

----------------------------------------------------------------------
 .../interpreter/PolicyExecutionPlannerImpl.java |  3 +-
 .../interpreter/PolicyInterpreterTest.java      | 72 ++++++++++++++++++++
 2 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/af33ae3c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
index e30b3de..4e6901d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlannerImpl.java
@@ -199,7 +199,8 @@ class PolicyExecutionPlannerImpl implements PolicyExecutionPlanner {
                                 for (Map.Entry<String, List<Variable>> entry : streamGroupBy.entrySet()) {
                                     if (entry.getValue().size() > 0) {
                                         StreamPartition partition = generatePartition(entry.getKey(), null, Arrays.asList(entry.getValue().toArray(new Variable[entry.getValue().size()])));
-                                        if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN)) {
+                                        if (((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.PATTERN)
+                                                || ((StateInputStream) inputStream).getStateType().equals(StateInputStream.Type.SEQUENCE)) {
                                             if (effectivePartitions.containsKey(partition.getStreamId())) {
                                                 StreamPartition existingPartition = effectivePartitions.get(partition.getStreamId());
                                                 if (!existingPartition.equals(partition)

http://git-wip-us.apache.org/repos/asf/eagle/blob/af33ae3c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
index 1553e17..4047fc1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/interpreter/PolicyInterpreterTest.java
@@ -483,4 +483,76 @@ public class PolicyInterpreterTest {
         Assert.assertArrayEquals(new String[]{"site", "host", "component", "metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
         Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
     }
+
+    @Test
+    public void testValidPolicyWithSequence() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        String policy =
+                "from every a = HADOOP_JMX_METRIC_STREAM_1[component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] " +
+                        ", b = HADOOP_JMX_METRIC_STREAM_1[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " +
+                        "select b.metric, b.host as host, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site " +
+                        "group by b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;";
+        definition.setValue(policy);
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getOutputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+        Assert.assertNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+        Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType());
+        Assert.assertArrayEquals(new String[]{"metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
+        Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
+    }
+
+    @Test
+    public void testValidPolicyWithSequenceSort() {
+        PolicyDefinition policyDefinition = new PolicyDefinition();
+        policyDefinition.setName("test_policy");
+        policyDefinition.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1"));
+        policyDefinition.setOutputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT"));
+
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        definition.setType("siddhi");
+        String policy =
+                "from HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"]#window.externalTime(timestamp, 1 min) " +
+                        "select * group by site, host, component, metric insert into temp;\n" +
+                        "\n" +
+                        "from every a = HADOOP_JMX_METRIC_STREAM_1[metric == \"hadoop.namenode.dfs.missingblocks\"], b = HADOOP_JMX_METRIC_STREAM_1[b.component == a.component and b.metric == a.metric and b.host == a.host and convert(b.value, \"long\") > convert(a.value, \"long\") ] " +
+                        "select b.site, b.host, b.component, b.metric, convert(b.value, \"long\") as newNumOfMissingBlocks, convert(a.value, \"long\") as oldNumOfMissingBlocks, max(b.timestamp) as timestamp " +
+                        "group by b.site, b.host, b.component, b.metric insert into HADOOP_JMX_METRIC_STREAM_1_MISS_BLOCKS_OUT;";
+        definition.setValue(policy);
+        definition.setInputStreams(policyDefinition.getInputStreams());
+        definition.setOutputStreams(policyDefinition.getOutputStreams());
+        policyDefinition.setDefinition(definition);
+
+        PolicyValidationResult validation = PolicyInterpreter.validate(policyDefinition, new HashMap<String, StreamDefinition>() {
+            {
+                put("HADOOP_JMX_METRIC_STREAM_1", mockStreamDefinition("HADOOP_JMX_METRIC_STREAM_1"));
+            }
+        });
+        Assert.assertTrue(validation.isSuccess());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getInputStreams().size());
+        Assert.assertEquals(2, validation.getPolicyExecutionPlan().getOutputStreams().size());
+        Assert.assertEquals(1, validation.getPolicyExecutionPlan().getStreamPartitions().size());
+        Assert.assertNotNull(validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec());
+        Assert.assertEquals(60000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowPeriodMillis());
+        Assert.assertEquals(12000, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getSortSpec().getWindowMargin());
+        Assert.assertEquals(StreamPartition.Type.GROUPBY, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getType());
+        Assert.assertArrayEquals(new String[]{"site", "host", "component", "metric"}, validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getColumns().toArray());
+        Assert.assertEquals("HADOOP_JMX_METRIC_STREAM_1", validation.getPolicyExecutionPlan().getStreamPartitions().get(0).getStreamId());
+    }
 }
\ No newline at end of file