You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/04/20 23:48:22 UTC
[1/3] storm git commit: STORM-786: KafkaBolt should ack tick tuples
Repository: storm
Updated Branches:
refs/heads/master 496bcf7d5 -> 84e8bc6d2
STORM-786: KafkaBolt should ack tick tuples
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dfe3fbe1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dfe3fbe1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dfe3fbe1
Branch: refs/heads/master
Commit: dfe3fbe144ed6398c2d2cc6210454d4c561e042e
Parents: a7c8310
Author: Michael G. Noll <mi...@michael-noll.com>
Authored: Thu Apr 16 11:44:40 2015 +0200
Committer: Michael G. Noll <mi...@michael-noll.com>
Committed: Thu Apr 16 11:44:40 2015 +0200
----------------------------------------------------------------------
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 +
.../test/storm/kafka/bolt/KafkaBoltTest.java | 27 ++++++++++++++++++++
2 files changed, 28 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dfe3fbe1/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index a8c4321..714ecd3 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -91,6 +91,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
if (TupleUtils.isTick(input)) {
+ collector.ack(input);
return; // Do not try to send ticks to Kafka
}
http://git-wip-us.apache.org/repos/asf/storm/blob/dfe3fbe1/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index c30cba1..576cc12 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -18,6 +18,7 @@
package storm.kafka.bolt;
import backtype.storm.Config;
+import backtype.storm.Constants;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
@@ -26,6 +27,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImpl;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
import backtype.storm.utils.Utils;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
@@ -45,7 +47,10 @@ import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class KafkaBoltTest {
@@ -84,6 +89,18 @@ public class KafkaBoltTest {
}
@Test
+ public void shouldAcknowledgeTickTuples() throws Exception {
+ // Given
+ Tuple tickTuple = mockTickTuple();
+
+ // When
+ bolt.execute(tickTuple);
+
+ // Then
+ verify(collector).ack(tickTuple);
+ }
+
+ @Test
public void executeWithKey() throws Exception {
String message = "value-123";
String key = "key-123";
@@ -185,4 +202,14 @@ public class KafkaBoltTest {
};
return new TupleImpl(topologyContext, new Values(message), 1, "");
}
+
+ private Tuple mockTickTuple() {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
+ when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
+ // Sanity check
+ assertTrue(TupleUtils.isTick(tuple));
+ return tuple;
+ }
+
}
[3/3] storm git commit: Added STORM-786 to changelog
Posted by bo...@apache.org.
Added STORM-786 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/84e8bc6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/84e8bc6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/84e8bc6d
Branch: refs/heads/master
Commit: 84e8bc6d28b54056dd75375be7d316ab03125fb6
Parents: 9220e1b
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Apr 20 16:38:34 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Apr 20 16:38:34 2015 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/84e8bc6d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 22910f2..fd32d68 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-786: KafkaBolt should ack tick tuples
* STORM-791: Storm UI displays maps in the config incorrectly
* STORM-788: UI Fix key for process latencies
* STORM-773: backtype.storm.transactional-test fails periodically with timeout
[2/3] storm git commit: Merge branch 'STORM-786' of
https://github.com/miguno/storm into STORM-786
Posted by bo...@apache.org.
Merge branch 'STORM-786' of https://github.com/miguno/storm into STORM-786
STORM-786: KafkaBolt should ack tick tuples
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9220e1b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9220e1b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9220e1b3
Branch: refs/heads/master
Commit: 9220e1b3287480080563d91297c9cc2c6358a1fe
Parents: 496bcf7 dfe3fbe
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Apr 20 16:32:14 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Apr 20 16:32:14 2015 -0500
----------------------------------------------------------------------
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 +
.../test/storm/kafka/bolt/KafkaBoltTest.java | 27 ++++++++++++++++++++
2 files changed, 28 insertions(+)
----------------------------------------------------------------------