You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/09/29 05:28:01 UTC

[1/2] storm git commit: STORM-2648/STORM-2357: Add storm-kafka-client support for at-most-once processing and a toggle for whether messages should be emitted with a message id when not using at-least-once

Repository: storm
Updated Branches:
  refs/heads/master 5c05bb74c -> 293643f09


STORM-2648/STORM-2357: Add storm-kafka-client support for at-most-once processing and a toggle for whether messages should be emitted with a message id when not using at-least-once

* Minor refactor of emit statements
* Add tests for at-most-once and any-times mode, deduplicate some test code in other tests
* Fix rebase conflicts and fix leaking state through unit test retry service
* Update storm-kafka-client doc


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/48f69690
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/48f69690
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/48f69690

Branch: refs/heads/master
Commit: 48f6969027e7b02a5b9220577189d3911aa2226d
Parents: 5c05bb7
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Mon Jul 31 20:26:55 2017 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 29 14:25:59 2017 +0900

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |  31 ++-
 .../apache/storm/kafka/spout/KafkaSpout.java    | 157 +++++++-------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  85 +++++++-
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 207 +++++++++++++++++++
 .../kafka/spout/MaxUncommittedOffsetTest.java   |  27 +--
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  47 ++---
 .../spout/SingleTopicKafkaUnitSetupHelper.java  |  67 ++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  17 +-
 8 files changed, 468 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/48f69690/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index a1814b4..a24c632 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -291,33 +291,32 @@ Depending on the structure of your Kafka cluster, distribution of the data, and
 
 ### Default values
 
-Currently the Kafka spout has has the following default values, which have shown to give good performance in the test environment as described in this [blog post] (https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
+Currently the Kafka spout has has the following default values, which have been shown to give good performance in the test environment as described in this [blog post] (https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
 
 * poll.timeout.ms = 200
 * offset.commit.period.ms = 30000   (30s)
 * max.uncommitted.offsets = 10000000
 <br/>
 
-# Kafka AutoCommitMode 
+# Messaging reliability modes
 
-If reliability isn't important to you -- that is, you don't care about losing tuples in failure situations --, and want to remove the overhead of tuple tracking, then you can run a KafkaSpout with AutoCommitMode.
-
-To enable it, you need to:
-
-* set Config.TOPOLOGY_ACKERS to 0;
-* enable *AutoCommitMode* in Kafka consumer configuration; 
-
-Here's one example to set AutoCommitMode in KafkaSpout:
+In some cases you may not need or want the spout to guarantee at-least-once processing of messages. The spout also supports at-most-once and any-times modes. At-most-once guarantees that any tuple emitted to the topology will never be reemitted. Any-times makes no guarantees, but may reduce the overhead of committing offsets to Kafka in cases where you truly don't care how many times a message is processed.
 
+To set the processing guarantee, use the KafkaSpoutConfig.Builder.setProcessingGuarantee method, e.g.
 ```java
 KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
-		.builder(String bootstrapServers, String ... topics)
-		.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
-		.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
-		.build();
+  .builder(String bootstrapServers, String ... topics)
+  .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
 ```
 
-*Note that it's not exactly At-Most-Once in Storm, as offset is committed periodically by Kafka consumer, some tuples could be replayed when KafkaSpout is crashed.*
-
+The spout will disable tuple tracking for emitted tuples by default when you use at-most-once or any-times. In some cases you may want to enable tracking anyway, because tuple tracking is necessary for some features of Storm, e.g. showing complete latency in Storm UI, or enabling backpressure through the `Config.TOPOLOGY_MAX_SPOUT_PENDING` parameter.
 
+If you need to enable tracking, use the KafkaSpoutConfig.Builder.setForceEnableTupleTracking method, e.g.
+```java
+KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
+  .builder(String bootstrapServers, String ... topics)
+  .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
+  .setForceEnableTupleTracking(true)
+```
 
+Note that this setting has no effect in at-least-once mode, where tuple tracking is always enabled.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/48f69690/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index cafac3e..d8a6d83 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -51,10 +51,12 @@ import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaSpout<K, V> extends BaseRichSpout {
+
     private static final long serialVersionUID = 4151921085047987154L;
     //Initial delay for the commit and subscription refresh timers
     public static final long TIMER_DELAY_MS = 500;
@@ -65,37 +67,36 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
-    private KafkaConsumerFactory kafkaConsumerFactory;
+    private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
     private transient KafkaConsumer<K, V> kafkaConsumer;
-    private transient boolean consumerAutoCommitMode;
 
     // Bookkeeping
     // Strategy to determine the fetch offset of the first realized by the spout upon activation
-    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  
-    // Class that has the logic to handle tuple failure
+    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
+    // Class that has the logic to handle tuple failure.
     private transient KafkaSpoutRetryService retryService;
     // Handles tuple events (emit, ack etc.)
     private transient KafkaTupleListener tupleListener;
-    // timer == null for auto commit mode
-    private transient Timer commitTimer;                                
+    // timer == null for modes other than at-least-once
+    private transient Timer commitTimer;
     // Flag indicating that the spout is still undergoing initialization process.
-    private transient boolean initialized;                              
+    private transient boolean initialized;
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
     // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
-    //or after a consumer rebalance, or during close/deactivate
+    //or after a consumer rebalance, or during close/deactivate. Always empty if not using at-least-once mode.
     private transient Map<TopicPartition, OffsetManager> offsetManagers;
-    // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
-    private transient Set<KafkaSpoutMessageId> emitted;                 
+    // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed.
+    // Always empty if not using at-least-once mode.
+    private transient Set<KafkaSpoutMessageId> emitted;
     // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
-    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;     
+    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
     // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
-    private transient long numUncommittedOffsets;                       
+    private transient long numUncommittedOffsets;
     // Triggers when a subscription should be refreshed
-    private transient Timer refreshSubscriptionTimer;                   
+    private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
 
-
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
     }
@@ -117,15 +118,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         // Offset management
         firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
-        // with AutoCommitMode, offset will be periodically committed in the background by Kafka consumer
-        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
-
+        
         // Retries management
         retryService = kafkaSpoutConfig.getRetryService();
 
         tupleListener = kafkaSpoutConfig.getTupleListener();
 
-        if (!consumerAutoCommitMode) {     // If it is auto commit, no need to commit offsets manually
+        if (isAtLeastOnce()) {
+            // Only used if the spout commits offsets for acked tuples
             commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
         refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
@@ -139,14 +139,18 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
     }
 
-    // =========== Consumer Rebalance Listener - On the same thread as the caller ===========
+    private boolean isAtLeastOnce() {
+        return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
+    }
 
+    // =========== Consumer Rebalance Listener - On the same thread as the caller ===========
     private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
+
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
-            if (!consumerAutoCommitMode && initialized) {
+                kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+            if (isAtLeastOnce() && initialized) {
                 initialized = false;
                 commitOffsetsForAckedTuples();
             }
@@ -155,37 +159,40 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]",
-                    context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+                context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
             tupleListener.onPartitionsReassigned(partitions);
             initialize(partitions);
         }
 
         private void initialize(Collection<TopicPartition> partitions) {
-            if (!consumerAutoCommitMode) {
+            if (isAtLeastOnce()) {
                 // remove from acked all partitions that are no longer assigned to this spout
                 offsetManagers.keySet().retainAll(partitions);
-            }
-
-            retryService.retainAll(partitions);
-
-            /*
-             * Emitted messages for partitions that are no longer assigned to this spout can't
-             * be acked and should not be retried, hence remove them from emitted collection.
-            */
-            Set<TopicPartition> partitionsSet = new HashSet<>(partitions);
-            Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
-            while (msgIdIterator.hasNext()) {
-                KafkaSpoutMessageId msgId = msgIdIterator.next();
-                if (!partitionsSet.contains(msgId.getTopicPartition())) {
-                    msgIdIterator.remove();
+                retryService.retainAll(partitions);
+
+                /*
+                 * Emitted messages for partitions that are no longer assigned to this spout can't
+                 * be acked and should not be retried, hence remove them from emitted collection.
+                 */
+                Set<TopicPartition> partitionsSet = new HashSet<>(partitions);
+                Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
+                while (msgIdIterator.hasNext()) {
+                    KafkaSpoutMessageId msgId = msgIdIterator.next();
+                    if (!partitionsSet.contains(msgId.getTopicPartition())) {
+                        msgIdIterator.remove();
+                    }
                 }
             }
 
             for (TopicPartition tp : partitions) {
                 final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
                 final long fetchOffset = doSeek(tp, committedOffset);
-                setAcked(tp, fetchOffset);
+                // Add offset managers for the new partitions.
+                // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
+                if (isAtLeastOnce() && !offsetManagers.containsKey(tp)) {
+                    offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
+                }
             }
             initialized = true;
             LOG.info("Initialization complete");
@@ -220,15 +227,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
-    private void setAcked(TopicPartition tp, long fetchOffset) {
-        // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
-        if (!consumerAutoCommitMode && !offsetManagers.containsKey(tp)) {
-            offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
-        }
-    }
-
     // ======== Next Tuple =======
-
     @Override
     public void nextTuple() {
         try {
@@ -263,7 +262,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private boolean commit() {
-        return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
+        return isAtLeastOnce() && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
     }
 
     private boolean poll() {
@@ -274,15 +273,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis,
             //and prevents locking up the spout when there are too many retriable tuples
             && (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets
-            || consumerAutoCommitMode);
-        
+            || !isAtLeastOnce());
+
         if (!poll) {
             if (waitingToEmit()) {
                 LOG.debug("Not polling. Tuples waiting to be emitted."
                     + " [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
             }
 
-            if (numUncommittedOffsets >= maxUncommittedOffsets && !consumerAutoCommitMode) {
+            if (numUncommittedOffsets >= maxUncommittedOffsets && isAtLeastOnce()) {
                 LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]",
                     numUncommittedOffsets, maxUncommittedOffsets);
             }
@@ -312,6 +311,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         final int numPolledRecords = consumerRecords.count();
         LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions",
             numPolledRecords, numUncommittedOffsets);
+        if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
+            //Commit polled records immediately to ensure delivery is at-most-once.
+            kafkaConsumer.commitSync();
+        }
         return consumerRecords;
     }
 
@@ -350,11 +353,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 final boolean isScheduled = retryService.isScheduled(msgId);
                 // not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried
                 if (!isScheduled || retryService.isReady(msgId)) {
-                    if (consumerAutoCommitMode) {
-                        if (tuple instanceof KafkaTuple) {
-                            collector.emit(((KafkaTuple) tuple).getStream(), tuple);
+                    String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;
+                    if (!isAtLeastOnce()) {
+                        if (kafkaSpoutConfig.getForceEnableTupleTracking()) {
+                            collector.emit(stream, tuple, msgId);
+                            LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
                         } else {
-                            collector.emit(tuple);
+                            collector.emit(stream, tuple);
+                            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
                         }
                     } else {
                         emitted.add(msgId);
@@ -364,15 +370,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                         } else {            //New tuple, hence increment the uncommitted offset counter
                             numUncommittedOffsets++;
                         }
-
-                        if (tuple instanceof KafkaTuple) {
-                            collector.emit(((KafkaTuple) tuple).getStream(), tuple, msgId);
-                        } else {
-                            collector.emit(tuple, msgId);
-                        }
+                        collector.emit(stream, tuple, msgId);
                         tupleListener.onEmit(tuple, msgId);
+                        LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
                     }
-                    LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
                     return true;
                 }
             } else {
@@ -414,7 +415,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
                 numUncommittedOffsets -= numCommittedOffsets;
                 LOG.debug("[{}] uncommitted offsets across all topic partitions",
-                        numUncommittedOffsets);
+                    numUncommittedOffsets);
             }
         } else {
             LOG.trace("No offsets to commit. {}", this);
@@ -422,31 +423,37 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     // ======== Ack =======
-
     @Override
     public void ack(Object messageId) {
+        if (!isAtLeastOnce()) {
+            // Only need to keep track of acked tuples if commits are done based on acks
+            return;
+        }
+
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!emitted.contains(msgId)) {
             if (msgId.isEmitted()) {
                 LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
-                        + "came from a topic-partition that this consumer group instance is no longer tracking "
-                        + "due to rebalance/partition reassignment. No action taken.", msgId);
+                    + "came from a topic-partition that this consumer group instance is no longer tracking "
+                    + "due to rebalance/partition reassignment. No action taken.", msgId);
             } else {
                 LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
             }
         } else {
-            if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
-                offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
-            }
+            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
             emitted.remove(msgId);
         }
         tupleListener.onAck(msgId);
     }
 
     // ======== Fail =======
-
     @Override
     public void fail(Object messageId) {
+        if (!isAtLeastOnce()) {
+            // Only need to keep track of failed tuples if commits are done based on acks
+            return;
+        }
+
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!emitted.contains(msgId)) {
             LOG.debug("Received fail for tuple this spout is no longer tracking."
@@ -467,7 +474,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     // ======== Activate / Deactivate / Close / Declare Outputs =======
-
     @Override
     public void activate() {
         try {
@@ -503,7 +509,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void shutdown() {
         try {
-            if (!consumerAutoCommitMode) {
+            if (isAtLeastOnce()) {
                 commitOffsetsForAckedTuples();
             }
         } finally {
@@ -523,9 +529,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public String toString() {
         return "KafkaSpout{"
-                + "offsetManagers =" + offsetManagers
-                + ", emitted=" + emitted
-                + "}";
+            + "offsetManagers =" + offsetManagers
+            + ", emitted=" + emitted
+            + "}";
     }
 
     @Override
@@ -548,6 +554,3 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return kafkaSpoutConfig.getSubscription().getTopicsString();
     }
 }
-
-
-

http://git-wip-us.apache.org/repos/asf/storm/blob/48f69690/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index d1940ec..f211697 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.Config;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription;
 import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
@@ -58,6 +59,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
         new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
             DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+    public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE;
 
     public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener();
 
@@ -75,6 +77,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     private final KafkaTupleListener tupleListener;
     private final long partitionRefreshPeriodMs;
     private final boolean emitNullTuples;
+    private final ProcessingGuarantee processingGuarantee;
+    private final boolean forceEnableTupleTracking;
 
     /**
      * Creates a new KafkaSpoutConfig using a Builder.
@@ -82,7 +86,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * @param builder The Builder to construct the KafkaSpoutConfig from
      */
     public KafkaSpoutConfig(Builder<K, V> builder) {
-        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+        setAutoCommitMode(builder);
+        this.kafkaProps = builder.kafkaProps;
         this.subscription = builder.subscription;
         this.translator = builder.translator;
         this.pollTimeoutMs = builder.pollTimeoutMs;
@@ -93,6 +98,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.tupleListener = builder.tupleListener;
         this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
         this.emitNullTuples = builder.emitNullTuples;
+        this.processingGuarantee = builder.processingGuarantee;
+        this.forceEnableTupleTracking = builder.forceEnableTupleTracking;
     }
 
     /**
@@ -118,6 +125,25 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         UNCOMMITTED_LATEST
     }
 
+    /**
+     * The processing guarantee supported by the spout. This parameter affects when the spout commits offsets to Kafka, marking them as
+     * processed.
+     *
+     * <ul>
+     * <li>AT_LEAST_ONCE means that the Kafka spout considers an offset ready for commit once a tuple corresponding to that offset has been
+     * acked on the spout. This corresponds to an at-least-once guarantee.</li>
+     * <li>ANY_TIMES means that the Kafka spout may commit polled offsets at any time. This means the message may be processed any number of
+     * times (including 0), and causes the spout to enable auto offset committing on the underlying consumer.</li>
+     * <li>AT_MOST_ONCE means that the spout will commit polled offsets before emitting them to the topology. This guarantees at-most-once
+     * processing.</li>
+     * </ul>
+     */
+    public static enum ProcessingGuarantee {
+        AT_LEAST_ONCE,
+        ANY_TIMES,
+        AT_MOST_ONCE
+    }
+
     public static class Builder<K, V> {
 
         private final Map<String, Object> kafkaProps;
@@ -131,6 +157,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER;
         private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
         private boolean emitNullTuples = false;
+        private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
+        private boolean forceEnableTupleTracking = false;
 
         public Builder(String bootstrapServers, String... topics) {
             this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
@@ -205,6 +233,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         /**
          * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
          *
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
          * @param offsetCommitPeriodMs time in ms
          */
         public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
@@ -218,6 +248,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
          * below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be exceeded,
          * but no partition will exceed this limit by more than maxPollRecords - 1.
          *
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
          * @param maxUncommittedOffsets max number of records that can be be pending commit
          */
         public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
@@ -239,6 +271,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         /**
          * Sets the retry service for the spout to use.
          *
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
          * @param retryService the new retry service
          * @return the builder (this).
          */
@@ -316,6 +350,32 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             return this;
         }
 
+        /**
+         * Specifies which processing guarantee the spout should offer. Refer to the documentation for {@link ProcessingGuarantee}.
+         *
+         * @param processingGuarantee The processing guarantee the spout should offer.
+         */
+        public Builder<K, V> setProcessingGuarantee(ProcessingGuarantee processingGuarantee) {
+            this.processingGuarantee = processingGuarantee;
+            return this;
+        }
+
+        /**
+         * Specifies whether the spout should require Storm to track emitted tuples when using a {@link ProcessingGuarantee} other than
+         * {@link ProcessingGuarantee#AT_LEAST_ONCE}. The spout will always track emitted tuples when offering at-least-once guarantees
+         * regardless of this setting. This setting is false by default.
+         *
+         * <p>Enabling tracking can be useful even in cases where reliability is not a concern, because it allows
+         * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and enables some spout metrics (e.g. complete-latency) that would
+         * otherwise be disabled.
+         *
+         * @param forceEnableTupleTracking true if Storm should track emitted tuples, false otherwise
+         */
+        public Builder<K, V> setForceEnableTupleTracking(boolean forceEnableTupleTracking) {
+            this.forceEnableTupleTracking = forceEnableTupleTracking;
+            return this;
+        }
+
         public KafkaSpoutConfig<K, V> build() {
             return new KafkaSpoutConfig<>(this);
         }
@@ -360,12 +420,16 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return builder;
     }
 
-    private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
-        // set defaults for properties not specified
-        if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-            kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+    private static void setAutoCommitMode(Builder<?, ?> builder) {
+        if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
+                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
+        }
+        if (builder.processingGuarantee == ProcessingGuarantee.ANY_TIMES) {
+            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        } else {
+            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         }
-        return kafkaProps;
     }
 
     /**
@@ -393,9 +457,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return offsetCommitPeriodMs;
     }
 
-    public boolean isConsumerAutoCommitMode() {
-        return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false
-            || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    public ProcessingGuarantee getProcessingGuarantee() {
+        return processingGuarantee;
+    }
+
+    public boolean getForceEnableTupleTracking() {
+        return forceEnableTupleTracking;
     }
 
     public String getConsumerGroupId() {

http://git-wip-us.apache.org/repos/asf/storm/blob/48f69690/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
new file mode 100644
index 0000000..1d96232
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutMessagingGuaranteeTest {
+
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+
+    @Before
+    public void setUp() {
+        consumerMock = mock(KafkaConsumer.class);
+    }
+
+    @Test
+    public void testAtMostOnceModeCommitsBeforeEmit() throws Exception {
+        //At-most-once mode must commit tuples before they are emitted to the topology to ensure that a spout crash won't cause replays.
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+
+        //The spout should have emitted the tuple, and must have committed it before emit
+        InOrder inOrder = inOrder(consumerMock, collectorMock);
+        inOrder.verify(consumerMock).poll(anyLong());
+        inOrder.verify(consumerMock).commitSync();
+        inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+    }
+
+    private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) {
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
+
+        when(consumerMock.poll(anyLong()))
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()))))
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.createRecords(partition, spoutConfig.getMaxUncommittedOffsets() - 1, spoutConfig.getMaxUncommittedOffsets()))));
+
+        for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() * 2; i++) {
+            spout.nextTuple();
+        }
+
+        verify(consumerMock, times(2)).poll(anyLong());
+        verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+    }
+
+    @Test
+    public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception {
+        //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+    }
+
+    @Test
+    public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception {
+        //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES)
+            .build();
+        doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+    }
+
+    private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) {
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+
+        ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+        assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+        spout.fail(msgIdCaptor.getValue());
+
+        reset(consumerMock);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.createRecords(partition, 1, 1))));
+
+        spout.nextTuple();
+
+        //The consumer should not be seeking to retry the failed tuple, it should just be continuing from the current position
+        verify(consumerMock, never()).seek(eq(partition), anyLong());
+    }
+
+    @Test
+    public void testAtMostOnceModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setForceEnableTupleTracking(true)
+            .build();
+        doTestModeCannotReplayTuples(spoutConfig);
+    }
+
+    @Test
+    public void testAnyTimesModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in any-times mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES)
+            .setForceEnableTupleTracking(true)
+            .build();
+        doTestModeCannotReplayTuples(spoutConfig);
+    }
+
+    private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, String> spoutConfig) {
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
+
+            spout.nextTuple();
+
+            ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+            assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+            spout.ack(msgIdCaptor.getValue());
+            
+            Time.advanceTime(spoutConfig.getOffsetsCommitPeriodMs());
+            
+            spout.nextTuple();
+            
+            verify(consumerMock, never()).commitSync(any());
+        }
+    }
+
+    @Test
+    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setForceEnableTupleTracking(true)
+            .build();
+        doTestModeDoesNotCommitAckedTuples(spoutConfig);
+    }
+    
+    @Test
+    public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked tuples in any-times mode because committing is managed by the consumer
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(-1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.ANY_TIMES)
+            .setForceEnableTupleTracking(true)
+            .build();
+        doTestModeDoesNotCommitAckedTuples(spoutConfig);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48f69690/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index c361e24..3afb498 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -21,21 +21,17 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isIn;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.storm.kafka.KafkaUnitRule;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -83,30 +79,15 @@ public class MaxUncommittedOffsetTest {
         this.spout = new KafkaSpout<>(spoutConfig);
     }
 
-    private void populateTopicData(String topicName, int msgCount) throws Exception {
-        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
-
-        for (int i = 0; i < msgCount; i++) {
-            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
-                topicName, Integer.toString(i),
-                Integer.toString(i));
-
-            kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
-        }
-    }
-
-    private void initializeSpout(int msgCount) throws Exception {
-        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
-        when(topologyContext.getThisTaskIndex()).thenReturn(0);
-        when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
-        spout.open(conf, topologyContext, collector);
-        spout.activate();
+    private void prepareSpout(int msgCount) throws Exception {
+        SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
     }
 
     private ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) throws Exception {
         assertThat("The message count is less than maxUncommittedOffsets. This test is not meaningful with this configuration.", messageCount, greaterThanOrEqualTo(maxUncommittedOffsets));
         //The spout must respect maxUncommittedOffsets when requesting/emitting tuples
-        initializeSpout(messageCount);
+        prepareSpout(messageCount);
 
         //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted
         ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/48f69690/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 35f95cd..b22ed47 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -19,7 +19,6 @@ package org.apache.storm.kafka.spout;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -28,24 +27,18 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.KafkaUnitRule;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -73,10 +66,9 @@ public class SingleTopicKafkaSpoutTest {
     private final Map<String, Object> conf = new HashMap<>();
     private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
     private final long commitOffsetPeriodMs = 2_000;
+    private final int maxRetries = 3;
     private KafkaConsumer<String, String> consumerSpy;
-    private KafkaConsumerFactory<String, String> consumerFactory;
     private KafkaSpout<String, String> spout;
-    private int maxRetries = 3;
 
     @Before
     public void setUp() {
@@ -87,29 +79,14 @@ public class SingleTopicKafkaSpoutTest {
                 maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
             .build();
         this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig));
-        this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
-        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+        this.spout = new KafkaSpout<>(spoutConfig, (ignored) -> consumerSpy);
     }
 
-    void populateTopicData(String topicName, int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
-        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
-
-        for (int i = 0; i < msgCount; i++) {
-            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
-                topicName, Integer.toString(i),
-                Integer.toString(i));
-            kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
-        }
-    }
-
-    private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
-        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
-        when(topologyContext.getThisTaskIndex()).thenReturn(0);
-        when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
-        spout.open(conf, topologyContext, collector);
-        spout.activate();
+    private void prepareSpout(int messageCount) throws Exception {
+        SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
     }
-
+    
     /*
      * Asserts that commitSync has been called once, 
      * that there are only commits on one topic,
@@ -127,7 +104,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldContinueWithSlowDoubleAcks() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 20;
-            initializeSpout(messageCount);
+            prepareSpout(messageCount);
 
             //play 1st tuple
             ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
@@ -166,7 +143,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldEmitAllMessages() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 10;
-            initializeSpout(messageCount);
+            prepareSpout(messageCount);
 
             //Emit all messages and check that they are emitted. Ack the messages too
             IntStream.range(0, messageCount).forEach(value -> {
@@ -194,7 +171,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldReplayInOrderFailedMessages() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 10;
-            initializeSpout(messageCount);
+            prepareSpout(messageCount);
 
             //play and ack 1 tuple
             ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
@@ -235,7 +212,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
             int messageCount = 10;
-            initializeSpout(messageCount);
+            prepareSpout(messageCount);
 
             //play 1st tuple
             ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
@@ -280,7 +257,7 @@ public class SingleTopicKafkaSpoutTest {
         //The spout must reemit retriable tuples, even if they fail out of order.
         //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries.
         int messageCount = 10;
-        initializeSpout(messageCount);
+        prepareSpout(messageCount);
 
         //play all tuples
         for (int i = 0; i < messageCount; i++) {
@@ -313,7 +290,7 @@ public class SingleTopicKafkaSpoutTest {
     public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
         //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
         int messageCount = 1;
-        initializeSpout(messageCount);
+        prepareSpout(messageCount);
 
         //Emit and fail the same tuple until we've reached retry limit
         for (int i = 0; i <= maxRetries; i++) {

http://git-wip-us.apache.org/repos/asf/storm/blob/48f69690/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
new file mode 100644
index 0000000..1c2158f
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.kafka.KafkaUnit;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+
+public class SingleTopicKafkaUnitSetupHelper {
+
+    /**
+     * Using the given KafkaUnit instance, put some messages in the specified topic.
+     *
+     * @param kafkaUnit The KafkaUnit instance to use
+     * @param topicName The topic to produce messages for
+     * @param msgCount The number of messages to produce
+     */
+    public static void populateTopicData(KafkaUnit kafkaUnit, String topicName, int msgCount) throws Exception {
+        kafkaUnit.createTopic(topicName);
+
+        for (int i = 0; i < msgCount; i++) {
+            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
+                topicName, Integer.toString(i),
+                Integer.toString(i));
+            kafkaUnit.sendMessage(producerRecord);
+        }
+    }
+
+    /**
+     * Open and activate a KafkaSpout that acts as a single-task/executor spout.
+     *
+     * @param <K> Kafka key type
+     * @param <V> Kafka value type
+     * @param spout The spout to prepare
+     * @param topoConf The topoConf
+     * @param topoContextMock The TopologyContext mock
+     * @param collectorMock The output collector mock
+     */
+    public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock,
+        SpoutOutputCollector collectorMock) throws Exception {
+        when(topoContextMock.getThisTaskIndex()).thenReturn(0);
+        when(topoContextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
+        spout.open(topoConf, topoContextMock, collectorMock);
+        spout.activate();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48f69690/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
index 2bf1f36..93a771d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
@@ -33,13 +33,6 @@ public class SingleTopicKafkaSpoutConfiguration {
     public static final String STREAM = "test_stream";
     public static final String TOPIC = "test";
 
-    /**
-     * Retry in a tight loop (keep unit tests fasts).
-     */
-    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
-        new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
-            DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
     public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(int port) {
         return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC));
     }
@@ -53,14 +46,18 @@ public class SingleTopicKafkaSpoutConfiguration {
             new Fields("topic", "key", "value"), STREAM)
             .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
             .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
-            .setRetry(getRetryService())
+            .setRetry(getNoDelayRetryService())
             .setOffsetCommitPeriodMs(10_000)
             .setFirstPollOffsetStrategy(EARLIEST)
             .setMaxUncommittedOffsets(250)
             .setPollTimeoutMs(1000);
     }
 
-    protected static KafkaSpoutRetryService getRetryService() {
-        return UNIT_TEST_RETRY_SERVICE;
+    protected static KafkaSpoutRetryService getNoDelayRetryService() {
+        /**
+         * Retry in a tight loop (keep unit tests fasts).
+         */
+        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
     }
 }


[2/2] storm git commit: Merge branch 'STORM-2648-merge'

Posted by ka...@apache.org.
Merge branch 'STORM-2648-merge'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/293643f0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/293643f0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/293643f0

Branch: refs/heads/master
Commit: 293643f0956abbd2ccdf2b85c70fee0e1dce6be0
Parents: 5c05bb7 48f6969
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Sep 29 14:26:30 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 29 14:26:30 2017 +0900

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |  31 ++-
 .../apache/storm/kafka/spout/KafkaSpout.java    | 157 +++++++-------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  85 +++++++-
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 207 +++++++++++++++++++
 .../kafka/spout/MaxUncommittedOffsetTest.java   |  27 +--
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  47 ++---
 .../spout/SingleTopicKafkaUnitSetupHelper.java  |  67 ++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  17 +-
 8 files changed, 468 insertions(+), 170 deletions(-)
----------------------------------------------------------------------