You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/14 02:55:17 UTC
[1/3] storm git commit: STORM-2340 fix AutoCommitMode issue in
KafkaSpout
Repository: storm
Updated Branches:
refs/heads/1.x-branch 2128fc34a -> 191a806de
STORM-2340 fix AutoCommitMode issue in KafkaSpout
* Closes #1919
* fix: KafkaSpout is blocked in AutoCommitMode
* add comments for impacts of AutoCommitMode
* add doc about how to use KafkaSpout with at-most-once.
* remove at-most-once for better describe the changes; emit null msgId when AutoCommitMode;
* update sample code in storm-kafka-client to use inline setProp()
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f5a1cf0b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f5a1cf0b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f5a1cf0b
Branch: refs/heads/1.x-branch
Commit: f5a1cf0b25be68a2b188f888a419bb14d270e2bc
Parents: 2128fc3
Author: mingmxu <mi...@ebay.com>
Authored: Fri Feb 3 12:03:37 2017 -0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Feb 14 11:27:39 2017 +0900
----------------------------------------------------------------------
docs/storm-kafka-client.md | 22 ++++++++++
.../apache/storm/kafka/spout/KafkaSpout.java | 43 +++++++++++++-------
2 files changed, 50 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f5a1cf0b/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index 90aca55..1e2498a 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -348,3 +348,25 @@ Currently the Kafka spout has has the following default values, which have shown
* offset.commit.period.ms = 30000 (30s)
* max.uncommitted.offsets = 10000000
<br/>
+
+# Kafka AutoCommitMode
+
+If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations --, and want to remove the overhead of tuple tracking, then you can run a KafkaSpout with AutoCommitMode.
+
+To enable it, you need to:
+* set Config.TOPOLOGY_ACKERS to 0;
+* enable *AutoCommitMode* in Kafka consumer configuration;
+
+Here's one example to set AutoCommitMode in KafkaSpout:
+```java
+KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
+ .builder(String bootstrapServers, String ... topics)
+ .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+ .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
+ .build();
+```
+
+*Note that it's not exactly At-Most-Once in Storm, as offset is committed periodically by Kafka consumer, some tuples could be replayed when KafkaSpout is crashed.*
+
+
+
http://git-wip-us.apache.org/repos/asf/storm/blob/f5a1cf0b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index f778288..9ad2be2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -78,10 +78,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
- transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
- private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
+ transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate. Not used if it's AutoCommitMode
+ private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
- private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed
+ private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode
private transient TopologyContext context;
private transient Timer refreshSubscriptionTimer; // Used to say when a subscription should be refreshed
@@ -107,7 +107,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Offset management
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
- consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
+ // with AutoCommitMode, offset will be periodically committed in the background by Kafka consumer
+ consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
// Retries management
retryService = kafkaSpoutConfig.getRetryService();
@@ -248,14 +249,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private boolean poll() {
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
- final boolean poll = !waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets;
+ final boolean poll = !waitingToEmit()
+ && ( numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode);
if (!poll) {
if (waitingToEmit()) {
LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
}
- if (numUncommittedOffsets >= maxUncommittedOffsets) {
+ if (numUncommittedOffsets >= maxUncommittedOffsets && !consumerAutoCommitMode) {
LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets);
}
}
@@ -320,15 +322,26 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
boolean isScheduled = retryService.isScheduled(msgId);
if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
- if (tuple instanceof KafkaTuple) {
- collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId);
- } else {
- collector.emit(tuple, msgId);
- }
- emitted.add(msgId);
- numUncommittedOffsets++;
- if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
- retryService.remove(msgId);
+
+ if(consumerAutoCommitMode){
+ if (tuple instanceof KafkaTuple) {
+ collector.emit(((KafkaTuple)tuple).getStream(), tuple);
+ } else {
+ collector.emit(tuple);
+ }
+ }else{
+ if (tuple instanceof KafkaTuple) {
+ collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId);
+ } else {
+ collector.emit(tuple, msgId);
+ }
+
+ emitted.add(msgId);
+ numUncommittedOffsets++;
+
+ if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
+ retryService.remove(msgId);
+ }
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
return true;
[2/3] storm git commit: Merge branch 'STORM-2340-1.x-merge' into
1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2340-1.x-merge' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f90d17c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f90d17c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f90d17c9
Branch: refs/heads/1.x-branch
Commit: f90d17c9715b6329938f2bd41442da5250a76bdc
Parents: 2128fc3 f5a1cf0
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Feb 14 11:53:49 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Feb 14 11:53:49 2017 +0900
----------------------------------------------------------------------
docs/storm-kafka-client.md | 22 ++++++++++
.../apache/storm/kafka/spout/KafkaSpout.java | 43 +++++++++++++-------
2 files changed, 50 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: STORM-2340: CHANGELOG
Posted by ka...@apache.org.
STORM-2340: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/191a806d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/191a806d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/191a806d
Branch: refs/heads/1.x-branch
Commit: 191a806de71d3e7526206b7cb6be7fad8f7da0bd
Parents: f90d17c
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Feb 14 11:55:07 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Feb 14 11:55:07 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/191a806d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2eb7667..4384f69 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
\ufeff## 1.1.0
+ * STORM-2340: fix AutoCommitMode issue in KafkaSpout
* STORM-2344: Flux YAML File Viewer for Nimbus UI
* STORM-2350: Storm-HDFS's listFilesByModificationTime is broken
* STORM-2270 Kafka spout should consume from latest when ZK partition commit offset bigger than the latest offset