You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/03 14:41:28 UTC

[10/40] storm git commit: STORM-2343: Fix new Kafka spout stopping processing if more than maxUncommittedOffsets tuples fail at once

STORM-2343: Fix new Kafka spout stopping processing if more than maxUncommittedOffsets tuples fail at once


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fe16fd40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fe16fd40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fe16fd40

Branch: refs/heads/1.1.x-branch
Commit: fe16fd40c69d872e0a2cd5a5dd0bc39fbf72cd9d
Parents: 29c349d
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Authored: Sat Apr 8 08:26:42 2017 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 29 16:29:34 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  26 +-
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  10 +-
 .../KafkaSpoutRetryExponentialBackoff.java      |  78 ++++--
 .../kafka/spout/KafkaSpoutRetryService.java     |  10 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   | 256 +++++++++++++++++++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  11 +-
 .../kafka/spout/MaxUncommittedOffsetTest.java   | 245 ++++++++++++++++++
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  45 +++-
 .../SingleTopicKafkaSpoutConfiguration.java     |  19 +-
 9 files changed, 633 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 207ba23..0bfcfea 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -79,7 +80,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient Map<TopicPartition, OffsetManager> acked;         // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
     private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
     private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;     // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
-    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode
+    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
     private transient Timer refreshSubscriptionTimer;                   // Triggers when a subscription should be refreshed
     private transient TopologyContext context;
 
@@ -250,9 +251,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private boolean poll() {
         final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
-        final boolean poll = !waitingToEmit()
-                && (numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode);
-
+        final int readyMessageCount = retryService.readyMessageCount();
+        final boolean poll = !waitingToEmit() &&
+            //Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit
+            //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, and prevents locking up the spout when there are too many retriable tuples
+            (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets ||
+            consumerAutoCommitMode);
+        
         if (!poll) {
             if (waitingToEmit()) {
                 LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
@@ -290,15 +295,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private void doSeekRetriableTopicPartitions() {
-        final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
+        final Map<TopicPartition, Long> retriableTopicPartitions = retryService.earliestRetriableOffsets();
 
-        for (TopicPartition rtp : retriableTopicPartitions) {
-            final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
-            if (offsetAndMeta != null) {
-                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
-            } else {
-                kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1);    // Seek to last committed offset
-            }
+        for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : retriableTopicPartitions.entrySet()) {
+            //Seek directly to the earliest retriable message for each retriable topic partition
+            kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
         }
     }
 
@@ -318,7 +319,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
         final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
-
         if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has been acked
             LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
         } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail

http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 920dca9..5f8071f 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -269,12 +269,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         
         /**
          * The maximum number of records a poll will return.
-         * Will only work with Kafka 0.10.0 and above.
          */
         public Builder<K,V> setMaxPollRecords(int records) {
-            //to avoid issues with 0.9 versions that technically still work
-            // with this we do not use ConsumerConfig.MAX_POLL_RECORDS_CONFIG
-            return setProp("max.poll.records", records);
+            return setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
         }
         
         //Security Related Configs
@@ -330,11 +327,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             this.offsetCommitPeriodMs = offsetCommitPeriodMs;
             return this;
         }
-        
+
         /**
          * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
          * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
-         * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+         * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+         * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1.
          * @param maxUncommittedOffsets max number of records that can be be pending commit
          */
         public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {

http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 2c8d7e4..2584685 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -25,15 +25,18 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import org.apache.storm.utils.Time;
 
 /**
  * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
- * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1)    where failCount = 1, 2, 3, ...
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1)    where failCount = 1, 2, 3, ...
  * nextRetry = Min(nextRetry, currentTime + maxDelay)
  */
 public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
@@ -54,7 +57,14 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
         @Override
         public int compare(RetrySchedule entry1, RetrySchedule entry2) {
-            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+            int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+            
+            if(result == 0) {
+                //TreeSet uses compareTo instead of equals() for the Set contract
+                //Ensure that we can save two retry schedules with the same timestamp
+                result = entry1.hashCode() - entry2.hashCode();
+            }
+            return result;
         }
     }
 
@@ -62,13 +72,13 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         private final KafkaSpoutMessageId msgId;
         private long nextRetryTimeNanos;
 
-        public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
+        public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) {
             this.msgId = msgId;
-            this.nextRetryTimeNanos = nextRetryTime;
+            this.nextRetryTimeNanos = nextRetryTimeNanos;
             LOG.debug("Created {}", this);
         }
 
-        public void setNextRetryTime() {
+        public void setNextRetryTimeNanos() {
             nextRetryTimeNanos = nextTime(msgId);
             LOG.debug("Updated {}", this);
         }
@@ -81,7 +91,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         public String toString() {
             return "RetrySchedule{" +
                     "msgId=" + msgId +
-                    ", nextRetryTime=" + nextRetryTimeNanos +
+                    ", nextRetryTimeNanos=" + nextRetryTimeNanos +
                     '}';
         }
 
@@ -96,19 +106,19 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
 
     public static class TimeInterval implements Serializable {
         private final long lengthNanos;
-        private final long length;
         private final TimeUnit timeUnit;
+        private final long length;
 
         /**
          * @param length length of the time interval in the units specified by {@link TimeUnit}
          * @param timeUnit unit used to specify a time interval on which to specify a time unit
          */
         public TimeInterval(long length, TimeUnit timeUnit) {
-            this.length = length;
-            this.timeUnit = timeUnit;
             this.lengthNanos = timeUnit.toNanos(length);
+            this.timeUnit = timeUnit;
+            this.length = length;
         }
-
+        
         public static TimeInterval seconds(long length) {
             return new TimeInterval(length, TimeUnit.SECONDS);
         }
@@ -116,19 +126,15 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         public static TimeInterval milliSeconds(long length) {
             return new TimeInterval(length, TimeUnit.MILLISECONDS);
         }
-
+        
         public static TimeInterval microSeconds(long length) {
             return new TimeInterval(length, TimeUnit.MICROSECONDS);
         }
-
+        
         public long lengthNanos() {
             return lengthNanos;
         }
-
-        public long length() {
-            return length;
-        }
-
+        
         public TimeUnit timeUnit() {
             return timeUnit;
         }
@@ -165,26 +171,32 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     }
 
     @Override
-    public Set<TopicPartition> retriableTopicPartitions() {
-        final Set<TopicPartition> tps = new HashSet<>();
-        final long currentTimeNanos = System.nanoTime();
+    public Map<TopicPartition, Long> earliestRetriableOffsets() {
+        final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
+        final long currentTimeNanos = Time.nanoTime();
         for (RetrySchedule retrySchedule : retrySchedules) {
             if (retrySchedule.retry(currentTimeNanos)) {
                 final KafkaSpoutMessageId msgId = retrySchedule.msgId;
-                tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
+                final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
+                final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
+                if(currentLowestOffset != null) {
+                    tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
+                } else {
+                    tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
+                }
             } else {
                 break;  // Stop searching as soon as passed current time
             }
         }
-        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
-        return tps;
+        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
+        return tpToEarliestRetriableOffset;
     }
 
     @Override
     public boolean isReady(KafkaSpoutMessageId msgId) {
         boolean retry = false;
         if (toRetryMsgs.contains(msgId)) {
-            final long currentTimeNanos = System.nanoTime();
+            final long currentTimeNanos = Time.nanoTime();
             for (RetrySchedule retrySchedule : retrySchedules) {
                 if (retrySchedule.retry(currentTimeNanos)) {
                     if (retrySchedule.msgId.equals(msgId)) {
@@ -265,13 +277,27 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
             return true;
         }
     }
+    
+    @Override
+    public int readyMessageCount() {
+        int count = 0;
+        final long currentTimeNanos = Time.nanoTime();
+        for (RetrySchedule retrySchedule : retrySchedules) {
+            if (retrySchedule.retry(currentTimeNanos)) {
+                ++count;
+            } else {
+                break; //Stop counting when past current time
+            }
+        }
+        return count;
+    }
 
     // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
     private long nextTime(KafkaSpoutMessageId msgId) {
-        final long currentTimeNanos = System.nanoTime();
+        final long currentTimeNanos = Time.nanoTime();
         final long nextTimeNanos = msgId.numFails() == 1                // numFails = 1, 2, 3, ...
                 ? currentTimeNanos + initialDelay.lengthNanos
-                : (currentTimeNanos + delayPeriod.timeUnit.toNanos((long) Math.pow(delayPeriod.length, msgId.numFails() - 1)));
+                : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1));
         return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index bf17a5a..f0230c3 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -54,10 +55,11 @@ public interface KafkaSpoutRetryService extends Serializable {
     boolean retainAll(Collection<TopicPartition> topicPartitions);
 
     /**
-     * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
-     * for which a tuple has failed and has retry time less than current time
+     * @return The earliest retriable offset for each TopicPartition that has
+     * offsets ready to be retried, i.e. for which a tuple has failed
+     * and has retry time less than current time
      */
-    Set<TopicPartition> retriableTopicPartitions();
+    Map<TopicPartition, Long> earliestRetriableOffsets();
 
     /**
      * Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried,
@@ -75,4 +77,6 @@ public interface KafkaSpoutRetryService extends Serializable {
      * Returns false is this message is not scheduled for retrial
      */
     boolean isScheduled(KafkaSpoutMessageId msgId);
+    
+    int readyMessageCount();
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
new file mode 100755
index 0000000..447f8c4
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.never;
+
+import java.util.HashSet;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.mockito.InOrder;
+
+public class KafkaSpoutEmitTest {
+
+    private final long offsetCommitPeriodMs = 2_000;
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+    private KafkaSpout<String, String> spout;
+    private KafkaSpoutConfig spoutConfig;
+
+    private void setupSpout(Set<TopicPartition> assignedPartitions) {
+        spoutConfig = getKafkaSpoutConfigBuilder(-1)
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .build();
+
+        consumerMock = mock(KafkaConsumer.class);
+        KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() {
+            @Override
+            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+                return consumerMock;
+            }
+        };
+
+        //Set up a spout listening to 1 topic partition
+        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+        spout.open(conf, contextMock, collectorMock);
+        spout.activate();
+
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+        //Assign partitions to the spout
+        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+    }
+
+    @Test
+    public void testNextTupleEmitsAtMostOneTuple() {
+        //The spout should emit at most one message per call to nextTuple
+        //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
+        setupSpout(Collections.singleton(partition));
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+        List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+        }
+        records.put(partition, recordsForPartition);
+
+        when(consumerMock.poll(anyLong()))
+            .thenReturn(new ConsumerRecords(records));
+
+        spout.nextTuple();
+
+        verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+    }
+
+    @Test
+    public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() {
+        //The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded
+        
+        //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            setupSpout(Collections.singleton(partition));
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
+                //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+            }
+            records.put(partition, recordsForPartition);
+
+            when(consumerMock.poll(anyLong()))
+                .thenReturn(new ConsumerRecords(records));
+
+            for (int i = 0; i < recordsForPartition.size(); i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+
+            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+                spout.fail(messageId);
+            }
+
+            reset(collectorMock);
+
+            Time.advanceTime(50);
+            //No backoff for test retry service, just check that messages will retry immediately
+            for (int i = 0; i < recordsForPartition.size(); i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), retryMessageIds.capture());
+
+            //Verify that the poll started at the earliest retriable tuple offset
+            List<Long> failedOffsets = new ArrayList<>();
+            for(KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+                failedOffsets.add(msgId.offset());
+            }
+            InOrder inOrder = inOrder(consumerMock);
+            inOrder.verify(consumerMock).seek(partition, failedOffsets.get(0));
+            inOrder.verify(consumerMock).poll(anyLong());
+        }
+    }
+    
+    @Test
+    public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples() {
+        /*
+        The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded.
+        numUncommittedOffsets is equal to numNonRetriableEmittedTuples + numRetriableTuples.
+        The spout will only emit if numUncommittedOffsets - numRetriableTuples < maxUncommittedOffsets (i.e. numNonRetriableEmittedTuples < maxUncommittedOffsets)
+        This means that the latest offset a poll can start at for a retriable partition,
+        counting from the last committed offset, is maxUncommittedOffsets,
+        where there are maxUncommittedOffsets - 1 uncommitted tuples "to the left".
+        If the retry poll starts at that offset, it at most emits the retried tuple plus maxPollRecords - 1 new tuples.
+        The limit on uncommitted offsets for one partition is therefore maxUncommittedOffsets + maxPollRecords - 1.
+        
+        It is only necessary to test this for a single partition, because partitions can't contribute negatively to numNonRetriableEmittedTuples,
+        so if the limit holds for one partition, it will also hold for each individual partition when multiple are involved.
+        
+        This makes the actual limit numPartitions * (maxUncommittedOffsets + maxPollRecords - 1)
+         */
+        
+        //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            setupSpout(Collections.singleton(partition));
+            
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPollRecords = new HashMap<>();
+            List<ConsumerRecord<String, String>> firstPollRecordsForPartition = new ArrayList<>();
+            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
+                //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+                firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+            }
+            firstPollRecords.put(partition, firstPollRecordsForPartition);
+            
+            int maxPollRecords = 5;
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPollRecords = new HashMap<>();
+            List<ConsumerRecord<String, String>> secondPollRecordsForPartition = new ArrayList<>();
+            for(int i = 0; i < maxPollRecords; i++) {
+                secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
+            }
+            secondPollRecords.put(partition, secondPollRecordsForPartition);
+
+            when(consumerMock.poll(anyLong()))
+                .thenReturn(new ConsumerRecords(firstPollRecords))
+                .thenReturn(new ConsumerRecords(secondPollRecords));
+
+            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(firstPollRecordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+
+            KafkaSpoutMessageId failedMessageId = messageIds.getAllValues().get(messageIds.getAllValues().size() - 1);
+            spout.fail(failedMessageId);
+
+            reset(collectorMock);
+
+            //Now make the single failed tuple retriable
+            Time.advanceTime(50);
+            //The spout should allow another poll since there are now only maxUncommittedOffsets - 1 nonretriable tuples
+            for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> retryBatchMessageIdsCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(maxPollRecords)).emit(anyString(), anyList(), retryBatchMessageIdsCaptor.capture());
+            reset(collectorMock);
+            
+            //Check that the consumer started polling at the failed tuple offset
+            InOrder inOrder = inOrder(consumerMock);
+            inOrder.verify(consumerMock).seek(partition, failedMessageId.offset());
+            inOrder.verify(consumerMock).poll(anyLong());
+            
+            //Now fail all except one of the last batch, and check that the spout won't reemit any tuples because there are more than maxUncommittedOffsets nonretriable tuples
+            Time.advanceTime(50);
+            List<KafkaSpoutMessageId> retryBatchMessageIds = retryBatchMessageIdsCaptor.getAllValues();
+            KafkaSpoutMessageId firstTupleFromRetryBatch = retryBatchMessageIds.remove(0);
+            for(KafkaSpoutMessageId msgId : retryBatchMessageIds) {
+                spout.fail(msgId);
+            }
+            for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) {
+                spout.nextTuple();
+            }
+            verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+            
+            //Fail the last tuple, which brings the number of nonretriable tuples back under the limit, and check that the spout polls again
+            spout.fail(firstTupleFromRetryBatch);
+            spout.nextTuple();
+            verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 6a0a63e..81e3807 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,7 +15,7 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
@@ -126,7 +126,9 @@ public class KafkaSpoutRebalanceTest {
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, this.offsetCommitPeriodMs), consumerFactoryMock);
+            KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+                .build(), consumerFactoryMock);
             String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
             TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
             TopicPartition assignedPartition = new TopicPartition(topic, 2);
@@ -155,7 +157,10 @@ public class KafkaSpoutRebalanceTest {
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
         KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactoryMock);
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+            .setOffsetCommitPeriodMs(10)
+            .setRetry(retryServiceMock)
+            .build(), consumerFactoryMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);

http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
new file mode 100755
index 0000000..a7ad4c2
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockitoAnnotations;
+
+public class MaxUncommittedOffsetTest {
+
+    @Rule
+    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+    private final TopologyContext topologyContext = mock(TopologyContext.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+    private final long commitOffsetPeriodMs = 2_000;
+    private final int numMessages = 100;
+    private final int maxUncommittedOffsets = 10;
+    private final int maxPollRecords = 5;
+    private final int initialRetryDelaySecs = 60;
+    private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+        .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+        .setMaxPollRecords(maxPollRecords)
+        .setMaxUncommittedOffsets(maxUncommittedOffsets)
+        .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+            1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
+        .build();
+    private KafkaSpout<String, String> spout;
+
+    @Before
+    public void setUp() {
+        //This is because the tests are checking that a hard cap of maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets exists
+        //so Kafka must be able to return more messages than that in order for the tests to be meaningful
+        assertThat("Current tests require numMessages >= 2*maxUncommittedOffsets", numMessages, greaterThanOrEqualTo(maxUncommittedOffsets * 2));
+        //This is to verify that a low maxPollRecords does not interfere with reemitting failed tuples
+        //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets.
+        assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets));
+        MockitoAnnotations.initMocks(this);
+        this.spout = new KafkaSpout<>(spoutConfig);
+    }
+
+    private void populateTopicData(String topicName, int msgCount) throws Exception {
+        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
+
+        for (int i = 0; i < msgCount; i++) {
+            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
+                topicName, Integer.toString(i),
+                Integer.toString(i));
+
+            kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
+        }
+    }
+
+    private void initializeSpout(int msgCount) throws Exception {
+        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        spout.open(conf, topologyContext, collector);
+        spout.activate();
+    }
+
+    private ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) throws Exception {
+        assertThat("The message count is less than maxUncommittedOffsets. This test is not meaningful with this configuration.", messageCount, greaterThanOrEqualTo(maxUncommittedOffsets));
+        //The spout must respect maxUncommittedOffsets when requesting/emitting tuples
+        initializeSpout(messageCount);
+
+        //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted
+        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        };
+        verify(collector, times(maxUncommittedOffsets)).emit(
+            anyString(),
+            anyList(),
+            messageIds.capture());
+        return messageIds;
+    }
+
+    @Test
+    public void testNextTupleCanEmitMoreMessagesWhenDroppingBelowMaxUncommittedOffsetsDueToCommit() throws Exception {
+        //The spout must respect maxUncommittedOffsets after committing a set of records
+        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+            //First check that maxUncommittedOffsets is respected when emitting from scratch
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+            reset(collector);
+
+            //Ack all emitted messages and commit them
+            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+                spout.ack(messageId);
+            }
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            spout.nextTuple();
+
+            //Now check that the spout will emit another maxUncommittedOffsets messages
+            for (int i = 0; i < numMessages; i++) {
+                spout.nextTuple();
+            }
+            verify(collector, times(maxUncommittedOffsets)).emit(
+                anyString(),
+                anyList(),
+                anyObject());
+        }
+    }
+
+    @Test
+    public void testNextTupleWillRespectMaxUncommittedOffsetsWhenThereAreAckedUncommittedTuples() throws Exception {
+        //The spout must respect maxUncommittedOffsets even if some tuples have been acked but not committed
+        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+            //First check that maxUncommittedOffsets is respected when emitting from scratch
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+            reset(collector);
+
+            //Fail all emitted messages except the last one. Try to commit.
+            List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
+            for (int i = 0; i < messageIdList.size() - 1; i++) {
+                spout.fail(messageIdList.get(i));
+            }
+            spout.ack(messageIdList.get(messageIdList.size() - 1));
+            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            spout.nextTuple();
+
+            //Now check that the spout will not emit anything else since nothing has been committed
+            for (int i = 0; i < numMessages; i++) {
+                spout.nextTuple();
+            }
+
+            verify(collector, times(0)).emit(
+                anyString(),
+                anyList(),
+                anyObject());
+        }
+    }
+
+    private void failAllExceptTheFirstMessageThenCommit(ArgumentCaptor<KafkaSpoutMessageId> messageIds) {
+        //Fail all emitted messages except the first. Commit the first.
+        List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
+        for (int i = 1; i < messageIdList.size(); i++) {
+            spout.fail(messageIdList.get(i));
+        }
+        spout.ack(messageIdList.get(0));
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        spout.nextTuple();
+    }
+
+    @Test
+    public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollRecordsMessages() throws Exception {
+        //The upper bound on uncommitted offsets should be maxUncommittedOffsets + maxPollRecords - 1
+        //This is reachable by emitting maxUncommittedOffsets messages, acking the first message, then polling.
+        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+            //First check that maxUncommittedOffsets is respected when emitting from scratch
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+            reset(collector);
+
+            failAllExceptTheFirstMessageThenCommit(messageIds);
+
+            //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed
+            //The spout should now emit another maxPollRecords messages
+            //This is allowed because the acked message brings the numUncommittedOffsets below the cap
+            for (int i = 0; i < maxUncommittedOffsets; i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collector, times(maxPollRecords)).emit(
+                anyString(),
+                anyList(),
+                secondRunMessageIds.capture());
+            reset(collector);
+
+            List<Long> firstRunOffsets = new ArrayList<>();
+            for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+                firstRunOffsets.add(msgId.offset());
+            }
+            List<Long> secondRunOffsets = new ArrayList<>();
+            for (KafkaSpoutMessageId msgId : secondRunMessageIds.getAllValues()) {
+                secondRunOffsets.add(msgId.offset());
+            }
+            assertThat("Expected the newly emitted messages to have no overlap with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false));
+
+            //Offset 0 is acked, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
+            //There are now maxUncommittedOffsets-1 + maxPollRecords records emitted past the last committed offset
+            //Advance time so the failed tuples become ready for retry, and check that the spout will emit retriable tuples as long as numNonRetriableEmittedTuples < maxUncommittedOffsets
+            
+            int numNonRetriableEmittedTuples = maxPollRecords; //The other tuples were failed and are becoming retriable
+            int allowedPolls = (int)Math.ceil((maxUncommittedOffsets - numNonRetriableEmittedTuples)/(double)maxPollRecords);
+            Time.advanceTimeSecs(initialRetryDelaySecs);
+            for (int i = 0; i < numMessages; i++) {
+                spout.nextTuple();
+            }
+            ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collector, times(allowedPolls*maxPollRecords)).emit(
+                anyString(),
+                anyList(),
+                thirdRunMessageIds.capture());
+            reset(collector);
+
+            List<Long> thirdRunOffsets = new ArrayList<>();
+            for (KafkaSpoutMessageId msgId : thirdRunMessageIds.getAllValues()) {
+                thirdRunOffsets.add(msgId.offset());
+            }
+
+            assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch", thirdRunOffsets, everyItem(isIn(firstRunOffsets)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 1121642..884709d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,7 +17,8 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -37,6 +38,7 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -46,6 +48,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
@@ -76,7 +81,9 @@ public class SingleTopicKafkaSpoutTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaUnit().getKafkaPort(), commitOffsetPeriodMs);
+        KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+            .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+            .build();
         this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
         this.consumerFactory = new KafkaConsumerFactory<String, String>() {
             @Override
@@ -275,4 +282,38 @@ public class SingleTopicKafkaSpoutTest {
             verifyAllMessagesCommitted(messageCount);
         }
     }
+    
+    @Test
+    public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception {
+        //The spout must reemit retriable tuples, even if they fail out of order.
+        //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries.
+        int messageCount = 10;
+        initializeSpout(messageCount);
+
+        //play all tuples
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIds.capture());
+        reset(collector);
+        //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
+        List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues();
+        spout.fail(capturedMessageIds.get(5));
+        spout.fail(capturedMessageIds.get(3));
+        spout.nextTuple();
+        spout.fail(capturedMessageIds.get(2));
+
+        //Check that the spout will reemit all 3 failed tuples and no other tuples
+        ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+        verify(collector, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture());
+        Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>();
+        expectedReemitIds.add(capturedMessageIds.get(5));
+        expectedReemitIds.add(capturedMessageIds.get(3));
+        expectedReemitIds.add(capturedMessageIds.get(2));
+        assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 99bd3de..d5c052b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -46,19 +46,11 @@ public class SingleTopicKafkaSpoutConfiguration {
 
     public static StormTopology getTopologyKafkaSpout(int port) {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(port)), 1);
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1);
         tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
         return tp.createTopology();
     }
 
-    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port) {
-        return getKafkaSpoutConfig(port, 10_000);
-    }
-
-    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs) {
-        return getKafkaSpoutConfig(port, offsetCommitPeriodMs, getRetryService());
-    }
-
     private static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
         @Override
         public List<Object> apply(ConsumerRecord<String, String> r) {
@@ -66,18 +58,17 @@ public class SingleTopicKafkaSpoutConfiguration {
         }
     };
     
-    static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+    public static KafkaSpoutConfig.Builder<String,String> getKafkaSpoutConfigBuilder(int port) {
         return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
                 .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
                         new Fields("topic", "key", "value"), STREAM)
                 .setGroupId("kafkaSpoutTestGroup")
                 .setMaxPollRecords(5)
-                .setRetry(retryService)
-                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+                .setRetry(getRetryService())
+                .setOffsetCommitPeriodMs(10_000)
                 .setFirstPollOffsetStrategy(EARLIEST)
                 .setMaxUncommittedOffsets(250)
-                .setPollTimeoutMs(1000)
-                .build();
+                .setPollTimeoutMs(1000);
     }
         
     protected static KafkaSpoutRetryService getRetryService() {