You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 23:27:44 UTC
[24/27] storm git commit: STORM-786: KafkaBolt should ack tick tuples
STORM-786: KafkaBolt should ack tick tuples
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69c5499e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69c5499e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69c5499e
Branch: refs/heads/0.10.x-branch
Commit: 69c5499eddb3630fcfd04d06b1ddfb51f077295b
Parents: 9116770
Author: Michael G. Noll <mi...@michael-noll.com>
Authored: Thu Apr 16 11:44:40 2015 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:51:26 2015 -0400
----------------------------------------------------------------------
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 +
.../test/storm/kafka/bolt/KafkaBoltTest.java | 27 ++++++++++++++++++++
2 files changed, 28 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/69c5499e/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index a8c4321..714ecd3 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -91,6 +91,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
if (TupleUtils.isTick(input)) {
+ collector.ack(input);
return; // Do not try to send ticks to Kafka
}
http://git-wip-us.apache.org/repos/asf/storm/blob/69c5499e/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index c30cba1..576cc12 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -18,6 +18,7 @@
package storm.kafka.bolt;
import backtype.storm.Config;
+import backtype.storm.Constants;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
@@ -26,6 +27,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImpl;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
import backtype.storm.utils.Utils;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
@@ -45,7 +47,10 @@ import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class KafkaBoltTest {
@@ -84,6 +89,18 @@ public class KafkaBoltTest {
}
@Test
+ public void shouldAcknowledgeTickTuples() throws Exception {
+ // Given
+ Tuple tickTuple = mockTickTuple();
+
+ // When
+ bolt.execute(tickTuple);
+
+ // Then
+ verify(collector).ack(tickTuple);
+ }
+
+ @Test
public void executeWithKey() throws Exception {
String message = "value-123";
String key = "key-123";
@@ -185,4 +202,14 @@ public class KafkaBoltTest {
};
return new TupleImpl(topologyContext, new Values(message), 1, "");
}
+
+ private Tuple mockTickTuple() {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
+ when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
+ // Sanity check
+ assertTrue(TupleUtils.isTick(tuple));
+ return tuple;
+ }
+
}