You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 23:27:44 UTC

[24/27] storm git commit: STORM-786: KafkaBolt should ack tick tuples

STORM-786: KafkaBolt should ack tick tuples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69c5499e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69c5499e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69c5499e

Branch: refs/heads/0.10.x-branch
Commit: 69c5499eddb3630fcfd04d06b1ddfb51f077295b
Parents: 9116770
Author: Michael G. Noll <mi...@michael-noll.com>
Authored: Thu Apr 16 11:44:40 2015 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:51:26 2015 -0400

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  1 +
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 27 ++++++++++++++++++++
 2 files changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/69c5499e/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index a8c4321..714ecd3 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -91,6 +91,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     @Override
     public void execute(Tuple input) {
         if (TupleUtils.isTick(input)) {
+          collector.ack(input);
           return; // Do not try to send ticks to Kafka
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/69c5499e/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index c30cba1..576cc12 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -18,6 +18,7 @@
 package storm.kafka.bolt;
 
 import backtype.storm.Config;
+import backtype.storm.Constants;
 import backtype.storm.task.GeneralTopologyContext;
 import backtype.storm.task.IOutputCollector;
 import backtype.storm.task.OutputCollector;
@@ -26,6 +27,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import backtype.storm.utils.Utils;
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
@@ -45,7 +47,10 @@ import java.util.HashMap;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class KafkaBoltTest {
 
@@ -84,6 +89,18 @@ public class KafkaBoltTest {
     }
 
     @Test
+    public void shouldAcknowledgeTickTuples() throws Exception {
+        // Given
+        Tuple tickTuple = mockTickTuple();
+
+        // When
+        bolt.execute(tickTuple);
+
+        // Then
+        verify(collector).ack(tickTuple);
+    }
+
+    @Test
     public void executeWithKey() throws Exception {
         String message = "value-123";
         String key = "key-123";
@@ -185,4 +202,14 @@ public class KafkaBoltTest {
         };
         return new TupleImpl(topologyContext, new Values(message), 1, "");
     }
+
+    private Tuple mockTickTuple() {
+        Tuple tuple = mock(Tuple.class);
+        when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
+        when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
+        // Sanity check
+        assertTrue(TupleUtils.isTick(tuple));
+        return tuple;
+    }
+
 }