You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/04/20 23:48:22 UTC

[1/3] storm git commit: STORM-786: KafkaBolt should ack tick tuples

Repository: storm
Updated Branches:
  refs/heads/master 496bcf7d5 -> 84e8bc6d2


STORM-786: KafkaBolt should ack tick tuples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dfe3fbe1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dfe3fbe1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dfe3fbe1

Branch: refs/heads/master
Commit: dfe3fbe144ed6398c2d2cc6210454d4c561e042e
Parents: a7c8310
Author: Michael G. Noll <mi...@michael-noll.com>
Authored: Thu Apr 16 11:44:40 2015 +0200
Committer: Michael G. Noll <mi...@michael-noll.com>
Committed: Thu Apr 16 11:44:40 2015 +0200

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  1 +
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 27 ++++++++++++++++++++
 2 files changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dfe3fbe1/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index a8c4321..714ecd3 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -91,6 +91,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     @Override
     public void execute(Tuple input) {
         if (TupleUtils.isTick(input)) {
+          collector.ack(input);
           return; // Do not try to send ticks to Kafka
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/dfe3fbe1/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index c30cba1..576cc12 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -18,6 +18,7 @@
 package storm.kafka.bolt;
 
 import backtype.storm.Config;
+import backtype.storm.Constants;
 import backtype.storm.task.GeneralTopologyContext;
 import backtype.storm.task.IOutputCollector;
 import backtype.storm.task.OutputCollector;
@@ -26,6 +27,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import backtype.storm.utils.Utils;
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
@@ -45,7 +47,10 @@ import java.util.HashMap;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class KafkaBoltTest {
 
@@ -84,6 +89,18 @@ public class KafkaBoltTest {
     }
 
     @Test
+    public void shouldAcknowledgeTickTuples() throws Exception {
+        // Given
+        Tuple tickTuple = mockTickTuple();
+
+        // When
+        bolt.execute(tickTuple);
+
+        // Then
+        verify(collector).ack(tickTuple);
+    }
+
+    @Test
     public void executeWithKey() throws Exception {
         String message = "value-123";
         String key = "key-123";
@@ -185,4 +202,14 @@ public class KafkaBoltTest {
         };
         return new TupleImpl(topologyContext, new Values(message), 1, "");
     }
+
+    private Tuple mockTickTuple() {
+        Tuple tuple = mock(Tuple.class);
+        when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
+        when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
+        // Sanity check
+        assertTrue(TupleUtils.isTick(tuple));
+        return tuple;
+    }
+
 }


[3/3] storm git commit: Added STORM-786 to changelog

Posted by bo...@apache.org.
Added STORM-786 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/84e8bc6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/84e8bc6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/84e8bc6d

Branch: refs/heads/master
Commit: 84e8bc6d28b54056dd75375be7d316ab03125fb6
Parents: 9220e1b
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Apr 20 16:38:34 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Apr 20 16:38:34 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/84e8bc6d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 22910f2..fd32d68 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-786: KafkaBolt should ack tick tuples
  * STORM-791: Storm UI displays maps in the config incorrectly
  * STORM-788: UI Fix key for process latencies
  * STORM-773: backtype.storm.transactional-test fails periodically with timeout


[2/3] storm git commit: Merge branch 'STORM-786' of https://github.com/miguno/storm into STORM-786

Posted by bo...@apache.org.
Merge branch 'STORM-786' of https://github.com/miguno/storm into STORM-786

STORM-786: KafkaBolt should ack tick tuples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9220e1b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9220e1b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9220e1b3

Branch: refs/heads/master
Commit: 9220e1b3287480080563d91297c9cc2c6358a1fe
Parents: 496bcf7 dfe3fbe
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Apr 20 16:32:14 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Apr 20 16:32:14 2015 -0500

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  1 +
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 27 ++++++++++++++++++++
 2 files changed, 28 insertions(+)
----------------------------------------------------------------------