You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/10/05 20:50:14 UTC

incubator-eagle git commit: [EAGLE-590]: AlertEngine: the kafka_spout might be dropped by metadata update when system ack a tupe

Repository: incubator-eagle
Updated Branches:
  refs/heads/master f12c82f8a -> f467954ac


[EAGLE-590]: AlertEngine: the kafka_spout might be dropped by metadata update when system ack a tupe

Author: ralphsu

This closes #475


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f467954a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f467954a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f467954a

Branch: refs/heads/master
Commit: f467954acc427763d5ab900214018a6be4830f02
Parents: f12c82f
Author: Ralph, Su <su...@gmail.com>
Authored: Thu Oct 6 02:13:56 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Thu Oct 6 04:50:31 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/spout/CorrelationSpout.java    |   8 +-
 .../alert/engine/router/TestAlertBolt.java      | 151 +++++++++++++++++++
 2 files changed, 157 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f467954a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index d4266a3..67074ce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -183,7 +183,9 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         // decode and get topic
         KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
         KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        spout.ack(id.id);
+        if (spout !=  null) {
+            spout.ack(id.id);
+        }
     }
 
     @Override
@@ -192,7 +194,9 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
         KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
         LOG.error("Failing message {}, with topic {}", msgId, id.topic);
         KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        spout.fail(id.id);
+        if (spout !=  null) {
+            spout.fail(id.id);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f467954a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 4bec98d..2b9144d 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -435,5 +435,156 @@ public class TestAlertBolt {
         Assert.assertTrue(recieved.get());
     }
 
+    @Test
+    public void testMultiStreamDefinition() throws Exception {
+        final AtomicInteger alertCount = new AtomicInteger();
+        final Semaphore mutex = new Semaphore(0);
+        OutputCollector collector = new OutputCollector(new IOutputCollector() {
+            int count = 0;
+
+            @Override
+            public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+                System.out.println("=====output collector==========");
+                alertCount.incrementAndGet();
+                mutex.release();
+                Assert.assertTrue("symptomaticAlertOutputStream".equals((String) tuple.get(0))
+                    || "deviceDownAlertStream".equals((String) tuple.get(0)));
+                AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
+                System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
+
+                System.out.println("**********output collector end***********");
+                return null;
+            }
+
+            @Override
+            public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+            }
+
+            @Override
+            public void ack(Tuple input) {
+            }
+
+            @Override
+            public void fail(Tuple input) {
+            }
+
+            @Override
+            public void reportError(Throwable error) {
+            }
+        });
+
+
+        AlertBolt bolt = createAlertBolt(collector);
+
+        // construct StreamPartition
+        StreamPartition sp = new StreamPartition();
+        sp.setColumns(Collections.singletonList("col1"));
+        sp.setStreamId("correlatedStream");
+        sp.setType(StreamPartition.Type.GROUPBY);
+
+        pushAlertBoltSpec(sp, bolt);
+
+        // now emit
+        // contruct GeneralTopologyContext
+        GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+        int taskId = 1;
+        when(context.getComponentId(taskId)).thenReturn("comp1");
+        when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+
+        long base = System.currentTimeMillis();
+        int i = 0;
+        String linkedSwitch = "lvs-ra-01";
+
+        // construct event with "value1"
+        StreamEvent event1 = new StreamEvent();
+        event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000);
+        event1.setMetaVersion("version1");
+        Object[] data = new Object[] { base , "child-"+ (i++), "", linkedSwitch};
+        event1.setData(data);
+        event1.setStreamId("correlatedStream");
+        PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001);
+
+        // construct another event with "value1"
+        StreamEvent event2 = new StreamEvent();
+        event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:05:00") * 1000);
+        event2.setMetaVersion("version1");
+        data = new Object[] { base , "child-"+ (i++), "", linkedSwitch};
+        event2.setData(data);
+        event2.setStreamId("correlatedStream");
+        PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001);
+
+        Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
+        Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
+        bolt.execute(input);
+        bolt.execute(input2);
+        Assert.assertTrue("Timeout to acquire mutex in 5s", mutex.tryAcquire(1, 5, TimeUnit.SECONDS));
+        Assert.assertEquals(3, alertCount.get());
+        bolt.cleanup();
+    }
+
+    private void pushAlertBoltSpec(StreamPartition sp, AlertBolt bolt) {
+        Map<String, StreamDefinition> sds = new HashMap<>();
+        sds.put("correlatedStream", createCorrelateStream("correlatedStream"));
+        sds.put("symptomaticAlertOutputStream", createCorrelateStream("symptomaticAlertOutputStream")); // output of updated correlatedStream
+        sds.put("deviceDownAlertStream", createCorrelateStream("deviceDownAlertStream"));
+
+        PolicyDefinition pd = new PolicyDefinition();
+        pd.setName("network_symptomatic");
+        pd.setInputStreams(Arrays.asList("correlatedStream"));
+        pd.setOutputStreams(Arrays.asList("deviceDownAlertStream", "symptomaticAlertOutputStream"));
+
+        pd.setPartitionSpec(Arrays.asList(sp));
+
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setType(PolicyStreamHandlers.SIDDHI_ENGINE);
+        def.setValue("from correlatedStream#window.externalTime(timestamp, 3 min) select UUID() as docId, linkedSwitch, '' as parentKey, timestamp group by linkedSwitch having count() > 0 insert into deviceDownAlertStream; " +
+            " from correlatedStream#window.externalTime(timestamp, 3 min) as left join deviceDownAlertStream#window.time(3 min) as right on left.linkedSwitch == right.linkedSwitch" +
+            " select left.docId, left.timestamp, left.linkedSwitch, right.docId as parentKey insert into symptomaticAlertOutputStream;");
+        pd.setDefinition(def);
+
+
+        AlertBoltSpec spec = new AlertBoltSpec();
+        spec.setVersion("version1");
+        spec.setTopologyName("testTopology");
+        spec.addBoltPolicy("alertBolt1", pd.getName());
+        spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(Arrays.asList(pd)));
+
+        bolt.onAlertBoltSpecChange(spec, sds);
+    }
+
+    private StreamDefinition createCorrelateStream(String streamId) {
+        // construct StreamDefinition
+        StreamDefinition schema = new StreamDefinition();
+        schema.setStreamId(streamId);
+        List<StreamColumn> columns = new LinkedList<>();
+        {
+            StreamColumn column = new StreamColumn();
+            column.setName("timestamp");
+            column.setType(StreamColumn.Type.LONG);
+            columns.add(column);
+        }
+        {
+            StreamColumn column = new StreamColumn();
+            column.setName("docId");
+            column.setType(StreamColumn.Type.STRING);
+            columns.add(column);
+        }
+        {
+            StreamColumn column = new StreamColumn();
+            column.setName("parentKey");
+            column.setType(StreamColumn.Type.STRING);
+            columns.add(column);
+        }
+        {
+            StreamColumn column = new StreamColumn();
+            column.setName("linkedSwitch");
+            column.setType(StreamColumn.Type.STRING);
+            columns.add(column);
+        }
+
+        schema.setColumns(columns);
+        return schema;
+    }
+
 }