You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/14 02:55:17 UTC

[1/3] storm git commit: STORM-2340 fix AutoCommitMode issue in KafkaSpout

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 2128fc34a -> 191a806de


STORM-2340 fix AutoCommitMode issue in KafkaSpout

* Closes #1919
* fix: KafkaSpout is blocked in AutoCommitMode
* add comments for impacts of AutoCommitMode
* add doc about how to use KafkaSpout with at-most-once.
* remove at-most-once for better describe the changes; emit null msgId when AutoCommitMode;
* update sample code in storm-kafka-client to use inline setProp()


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f5a1cf0b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f5a1cf0b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f5a1cf0b

Branch: refs/heads/1.x-branch
Commit: f5a1cf0b25be68a2b188f888a419bb14d270e2bc
Parents: 2128fc3
Author: mingmxu <mi...@ebay.com>
Authored: Fri Feb 3 12:03:37 2017 -0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Feb 14 11:27:39 2017 +0900

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      | 22 ++++++++++
 .../apache/storm/kafka/spout/KafkaSpout.java    | 43 +++++++++++++-------
 2 files changed, 50 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f5a1cf0b/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index 90aca55..1e2498a 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -348,3 +348,25 @@ Currently the Kafka spout has has the following default values, which have shown
 * offset.commit.period.ms = 30000   (30s)
 * max.uncommitted.offsets = 10000000
 <br/>
+
+# Kafka AutoCommitMode 
+
+If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations --, and want to remove the overhead of tuple tracking, then you can run a KafkaSpout with AutoCommitMode.
+
+To enable it, you need to:
+* set Config.TOPOLOGY_ACKERS to 0;
+* enable *AutoCommitMode* in Kafka consumer configuration; 
+
+Here's one example to set AutoCommitMode in KafkaSpout:
+```java
+KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
+		.builder(String bootstrapServers, String ... topics)
+		.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+		.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
+		.build();
+```
+
+*Note that it's not exactly At-Most-Once in Storm, as offset is committed periodically by Kafka consumer, some tuples could be replayed when KafkaSpout is crashed.*
+
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/f5a1cf0b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index f778288..9ad2be2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -78,10 +78,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
-    private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
+    transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate. Not used if it's AutoCommitMode
+    private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
     private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
-    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
+    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode
     private transient TopologyContext context;
     private transient Timer refreshSubscriptionTimer;                   // Used to say when a subscription should be refreshed
 
@@ -107,7 +107,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         // Offset management
         firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
-        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
+        // with AutoCommitMode, offset will be periodically committed in the background by Kafka consumer
+        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();  
 
         // Retries management
         retryService = kafkaSpoutConfig.getRetryService();
@@ -248,14 +249,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private boolean poll() {
         final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
-        final boolean poll = !waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets;
+        final boolean poll = !waitingToEmit() 
+        		&& ( numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode);
 
         if (!poll) {
             if (waitingToEmit()) {
                 LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
             }
 
-            if (numUncommittedOffsets >= maxUncommittedOffsets) {
+            if (numUncommittedOffsets >= maxUncommittedOffsets && !consumerAutoCommitMode) {
                 LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets);
             }
         }
@@ -320,15 +322,26 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             boolean isScheduled = retryService.isScheduled(msgId);
             if (!isScheduled || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
                 final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
-                if (tuple instanceof KafkaTuple) {
-                    collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId);
-                } else {
-                    collector.emit(tuple, msgId);
-                }
-                emitted.add(msgId);
-                numUncommittedOffsets++;
-                if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
-                    retryService.remove(msgId);
+                
+                if(consumerAutoCommitMode){
+                    if (tuple instanceof KafkaTuple) {
+                        collector.emit(((KafkaTuple)tuple).getStream(), tuple);
+                    } else {
+                        collector.emit(tuple);
+                    }
+                }else{                
+                    if (tuple instanceof KafkaTuple) {
+                        collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId);
+                    } else {
+                        collector.emit(tuple, msgId);
+                    }
+                
+                    emitted.add(msgId);
+                    numUncommittedOffsets++;
+                
+                    if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
+                        retryService.remove(msgId);
+                    }
                 }
                 LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
                 return true;


[2/3] storm git commit: Merge branch 'STORM-2340-1.x-merge' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-2340-1.x-merge' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f90d17c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f90d17c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f90d17c9

Branch: refs/heads/1.x-branch
Commit: f90d17c9715b6329938f2bd41442da5250a76bdc
Parents: 2128fc3 f5a1cf0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Feb 14 11:53:49 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Feb 14 11:53:49 2017 +0900

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      | 22 ++++++++++
 .../apache/storm/kafka/spout/KafkaSpout.java    | 43 +++++++++++++-------
 2 files changed, 50 insertions(+), 15 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: STORM-2340: CHANGELOG

Posted by ka...@apache.org.
STORM-2340: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/191a806d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/191a806d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/191a806d

Branch: refs/heads/1.x-branch
Commit: 191a806de71d3e7526206b7cb6be7fad8f7da0bd
Parents: f90d17c
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Feb 14 11:55:07 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Feb 14 11:55:07 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/191a806d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2eb7667..4384f69 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 \ufeff## 1.1.0
+ * STORM-2340: fix AutoCommitMode issue in KafkaSpout
  * STORM-2344: Flux YAML File Viewer for Nimbus UI
  * STORM-2350: Storm-HDFS's listFilesByModificationTime is broken
  * STORM-2270 Kafka spout should consume from latest when ZK partition commit offset bigger than the latest offset