You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/10/05 20:50:14 UTC
incubator-eagle git commit: [EAGLE-590]: AlertEngine: the kafka_spout
might be dropped by metadata update when system ack a tupe
Repository: incubator-eagle
Updated Branches:
refs/heads/master f12c82f8a -> f467954ac
[EAGLE-590]: AlertEngine: the kafka_spout might be dropped by metadata update when system ack a tupe
Author: ralphsu
This closes #475
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f467954a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f467954a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f467954a
Branch: refs/heads/master
Commit: f467954acc427763d5ab900214018a6be4830f02
Parents: f12c82f
Author: Ralph, Su <su...@gmail.com>
Authored: Thu Oct 6 02:13:56 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Thu Oct 6 04:50:31 2016 +0800
----------------------------------------------------------------------
.../alert/engine/spout/CorrelationSpout.java | 8 +-
.../alert/engine/router/TestAlertBolt.java | 151 +++++++++++++++++++
2 files changed, 157 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f467954a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index d4266a3..67074ce 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -183,7 +183,9 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
// decode and get topic
KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
- spout.ack(id.id);
+ if (spout != null) {
+ spout.ack(id.id);
+ }
}
@Override
@@ -192,7 +194,9 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
LOG.error("Failing message {}, with topic {}", msgId, id.topic);
KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
- spout.fail(id.id);
+ if (spout != null) {
+ spout.fail(id.id);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f467954a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
index 4bec98d..2b9144d 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
@@ -435,5 +435,156 @@ public class TestAlertBolt {
Assert.assertTrue(recieved.get());
}
+ @Test
+ public void testMultiStreamDefinition() throws Exception {
+ final AtomicInteger alertCount = new AtomicInteger();
+ final Semaphore mutex = new Semaphore(0);
+ OutputCollector collector = new OutputCollector(new IOutputCollector() {
+ int count = 0;
+
+ @Override
+ public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ System.out.println("=====output collector==========");
+ alertCount.incrementAndGet();
+ mutex.release();
+ Assert.assertTrue("symptomaticAlertOutputStream".equals((String) tuple.get(0))
+ || "deviceDownAlertStream".equals((String) tuple.get(0)));
+ AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
+ System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple));
+
+ System.out.println("**********output collector end***********");
+ return null;
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
+ }
+
+ @Override
+ public void ack(Tuple input) {
+ }
+
+ @Override
+ public void fail(Tuple input) {
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ }
+ });
+
+
+ AlertBolt bolt = createAlertBolt(collector);
+
+ // construct StreamPartition
+ StreamPartition sp = new StreamPartition();
+ sp.setColumns(Collections.singletonList("col1"));
+ sp.setStreamId("correlatedStream");
+ sp.setType(StreamPartition.Type.GROUPBY);
+
+ pushAlertBoltSpec(sp, bolt);
+
+ // now emit
+ // contruct GeneralTopologyContext
+ GeneralTopologyContext context = mock(GeneralTopologyContext.class);
+ int taskId = 1;
+ when(context.getComponentId(taskId)).thenReturn("comp1");
+ when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0"));
+
+ long base = System.currentTimeMillis();
+ int i = 0;
+ String linkedSwitch = "lvs-ra-01";
+
+ // construct event with "value1"
+ StreamEvent event1 = new StreamEvent();
+ event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000);
+ event1.setMetaVersion("version1");
+ Object[] data = new Object[] { base , "child-"+ (i++), "", linkedSwitch};
+ event1.setData(data);
+ event1.setStreamId("correlatedStream");
+ PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001);
+
+ // construct another event with "value1"
+ StreamEvent event2 = new StreamEvent();
+ event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:05:00") * 1000);
+ event2.setMetaVersion("version1");
+ data = new Object[] { base , "child-"+ (i++), "", linkedSwitch};
+ event2.setData(data);
+ event2.setStreamId("correlatedStream");
+ PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001);
+
+ Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default");
+ Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default");
+ bolt.execute(input);
+ bolt.execute(input2);
+ Assert.assertTrue("Timeout to acquire mutex in 5s", mutex.tryAcquire(1, 5, TimeUnit.SECONDS));
+ Assert.assertEquals(3, alertCount.get());
+ bolt.cleanup();
+ }
+
+ private void pushAlertBoltSpec(StreamPartition sp, AlertBolt bolt) {
+ Map<String, StreamDefinition> sds = new HashMap<>();
+ sds.put("correlatedStream", createCorrelateStream("correlatedStream"));
+ sds.put("symptomaticAlertOutputStream", createCorrelateStream("symptomaticAlertOutputStream")); // output of updated correlatedStream
+ sds.put("deviceDownAlertStream", createCorrelateStream("deviceDownAlertStream"));
+
+ PolicyDefinition pd = new PolicyDefinition();
+ pd.setName("network_symptomatic");
+ pd.setInputStreams(Arrays.asList("correlatedStream"));
+ pd.setOutputStreams(Arrays.asList("deviceDownAlertStream", "symptomaticAlertOutputStream"));
+
+ pd.setPartitionSpec(Arrays.asList(sp));
+
+ PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+ def.setType(PolicyStreamHandlers.SIDDHI_ENGINE);
+ def.setValue("from correlatedStream#window.externalTime(timestamp, 3 min) select UUID() as docId, linkedSwitch, '' as parentKey, timestamp group by linkedSwitch having count() > 0 insert into deviceDownAlertStream; " +
+ " from correlatedStream#window.externalTime(timestamp, 3 min) as left join deviceDownAlertStream#window.time(3 min) as right on left.linkedSwitch == right.linkedSwitch" +
+ " select left.docId, left.timestamp, left.linkedSwitch, right.docId as parentKey insert into symptomaticAlertOutputStream;");
+ pd.setDefinition(def);
+
+
+ AlertBoltSpec spec = new AlertBoltSpec();
+ spec.setVersion("version1");
+ spec.setTopologyName("testTopology");
+ spec.addBoltPolicy("alertBolt1", pd.getName());
+ spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(Arrays.asList(pd)));
+
+ bolt.onAlertBoltSpecChange(spec, sds);
+ }
+
+ private StreamDefinition createCorrelateStream(String streamId) {
+ // construct StreamDefinition
+ StreamDefinition schema = new StreamDefinition();
+ schema.setStreamId(streamId);
+ List<StreamColumn> columns = new LinkedList<>();
+ {
+ StreamColumn column = new StreamColumn();
+ column.setName("timestamp");
+ column.setType(StreamColumn.Type.LONG);
+ columns.add(column);
+ }
+ {
+ StreamColumn column = new StreamColumn();
+ column.setName("docId");
+ column.setType(StreamColumn.Type.STRING);
+ columns.add(column);
+ }
+ {
+ StreamColumn column = new StreamColumn();
+ column.setName("parentKey");
+ column.setType(StreamColumn.Type.STRING);
+ columns.add(column);
+ }
+ {
+ StreamColumn column = new StreamColumn();
+ column.setName("linkedSwitch");
+ column.setType(StreamColumn.Type.STRING);
+ columns.add(column);
+ }
+
+ schema.setColumns(columns);
+ return schema;
+ }
+
}