You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/07 21:05:24 UTC

[2/7] storm git commit: STORM-2936 Overwrite latest storm-kafka-client 1.x-branch into 1.1.x-branch

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
new file mode 100644
index 0000000..0bf9219
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.utils.Time;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertEquals;
+
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.hamcrest.Matchers;
+
+public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
+    private final int maxPollRecords = 10;
+    private final int maxRetries = 3;
+
+    public KafkaSpoutSingleTopicTest() {
+        super(2_000);
+    }
+
+    @Override
+    KafkaSpoutConfig<String, String> createSpoutConfig() {
+        return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+            KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+                Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
+            .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+            .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+                maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
+            .build();
+    }
+
+    @Test
+    public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception {
+        final int messageCount = maxPollRecords * 2;
+        prepareSpout(messageCount);
+
+        //Emit all messages and fail the first one while acking the rest
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+        ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture());
+        List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues();
+        for (int i = 1; i < messageIds.size(); i++) {
+            spout.ack(messageIds.get(i));
+        }
+        KafkaSpoutMessageId failedTuple = messageIds.get(0);
+        spout.fail(failedTuple);
+
+        //Advance the time and replay the failed tuple. 
+        reset(collectorMock);
+        spout.nextTuple();
+        ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(anyString(), anyList(), failedIdReplayCaptor.capture());
+
+        assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple));
+
+        /* Ack the tuple, and commit.
+         * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll.
+         */
+        reset(collectorMock);
+        Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
+        spout.ack(failedIdReplayCaptor.getValue());
+        spout.nextTuple();
+        verify(consumerSpy).commitSync(commitCapture.capture());
+
+        Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue();
+        TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+        assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp));
+        assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount));
+
+            /* Verify that the following acked (now committed) tuples are not emitted again
+             * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened,
+             * this verifies that the spout keeps the consumer position ahead of the committed offset when committing
+             */
+        //Just do a few polls to check that nothing more is emitted
+        for(int i = 0; i < 3; i++) {
+            spout.nextTuple();
+        }
+        verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+    }
+
+    @Test
+    public void testShouldContinueWithSlowDoubleAcks() throws Exception {
+        final int messageCount = 20;
+        prepareSpout(messageCount);
+
+        //play 1st tuple
+        ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), messageIdToDoubleAck.capture());
+        spout.ack(messageIdToDoubleAck.getValue());
+
+        //Emit some more messages
+        for(int i = 0; i < messageCount / 2; i++) {
+            spout.nextTuple();
+        }
+
+        spout.ack(messageIdToDoubleAck.getValue());
+
+        //Emit any remaining messages
+        for(int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+
+        //Verify that all messages are emitted, ack all the messages
+        ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class);
+        verify(collectorMock, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+            anyList(),
+            messageIds.capture());
+        for(Object id : messageIds.getAllValues()) {
+            spout.ack(id);
+        }
+
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void testShouldEmitAllMessages() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        //Emit all messages and check that they are emitted. Ack the messages too
+        for(int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+            ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
+            verify(collectorMock).emit(
+                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+                eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
+                    Integer.toString(i),
+                    Integer.toString(i))),
+                messageId.capture());
+            spout.ack(messageId.getValue());
+            reset(collectorMock);
+        }
+
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void testShouldReplayInOrderFailedMessages() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        //play and ack 1 tuple
+        ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), messageIdAcked.capture());
+        spout.ack(messageIdAcked.getValue());
+        reset(collectorMock);
+
+        //play and fail 1 tuple
+        ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), messageIdFailed.capture());
+        spout.fail(messageIdFailed.getValue());
+        reset(collectorMock);
+
+        //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
+        for(int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+
+        ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
+        //All messages except the first acked message should have been emitted
+        verify(collectorMock, times(messageCount - 1)).emit(
+            eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+            anyList(),
+            remainingMessageIds.capture());
+        for(Object id : remainingMessageIds.getAllValues()) {
+            spout.ack(id);
+        }
+
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        //play 1st tuple
+        ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), messageIdToFail.capture());
+        reset(collectorMock);
+
+        //play 2nd tuple
+        ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), messageIdToAck.capture());
+        reset(collectorMock);
+
+        //ack 2nd tuple
+        spout.ack(messageIdToAck.getValue());
+        //fail 1st tuple
+        spout.fail(messageIdToFail.getValue());
+
+        //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
+        for(int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+
+        ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
+        //All messages except the first acked message should have been emitted
+        verify(collectorMock, times(messageCount - 1)).emit(
+            eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+            anyList(),
+            remainingIds.capture());
+        for(Object id : remainingIds.getAllValues()) {
+            spout.ack(id);
+        }
+
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception {
+        //The spout must reemit retriable tuples, even if they fail out of order.
+        //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries.
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        //play all tuples
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock, times(messageCount)).emit(anyString(), anyList(), messageIds.capture());
+        reset(collectorMock);
+        //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
+        List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues();
+        spout.fail(capturedMessageIds.get(5));
+        spout.fail(capturedMessageIds.get(3));
+        spout.nextTuple();
+        spout.fail(capturedMessageIds.get(2));
+
+        //Check that the spout will reemit all 3 failed tuples and no other tuples
+        ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        for (int i = 0; i < messageCount; i++) {
+            spout.nextTuple();
+        }
+        verify(collectorMock, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture());
+        Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>();
+        expectedReemitIds.add(capturedMessageIds.get(5));
+        expectedReemitIds.add(capturedMessageIds.get(3));
+        expectedReemitIds.add(capturedMessageIds.get(2));
+        assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
+    }
+
+    @Test
+    public void testShouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
+        //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
+        final int messageCount = 1;
+        prepareSpout(messageCount);
+
+        //Emit and fail the same tuple until we've reached retry limit
+        for (int i = 0; i <= maxRetries; i++) {
+            ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            spout.nextTuple();
+            verify(collectorMock).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture());
+            KafkaSpoutMessageId msgId = messageIdFailed.getValue();
+            spout.fail(msgId);
+            assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1));
+            reset(collectorMock);
+        }
+
+        //Verify that the tuple is not emitted again
+        spout.nextTuple();
+        verify(collectorMock, never()).emit(anyString(), anyListOf(Object.class), anyObject());
+    }
+
+    @Test
+    public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception {
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
+
+        //Nothing is assigned yet, should emit nothing
+        spout.nextTuple();
+        verify(collectorMock, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+
+        SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+        Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS + KafkaSpout.TIMER_DELAY_MS);
+
+        //The new partition should be discovered and the message should be emitted
+        spout.nextTuple();
+        verify(collectorMock).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+    }
+
+    @Test
+    public void testOffsetMetrics() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        Map<String, Long> offsetMetric  = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+        // the offset of the last available message + 1.
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue());
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+        //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+
+        //Emit all messages and check that they are emitted. Ack the messages too
+        for (int i = 0; i < messageCount; i++) {
+            nextTuple_verifyEmitted_ack_resetCollector(i);
+        }
+
+        commitAndVerifyAllMessagesCommitted(messageCount);
+
+        offsetMetric  = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+        //latest offset
+        assertEquals(9, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+        // offset where processing will resume upon spout restart
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
new file mode 100644
index 0000000..a860cef
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.junit.Test;
+
+import java.util.regex.Pattern;
+
+import static org.mockito.Mockito.when;
+
+public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAbstractTest {
+
+    public KafkaSpoutTopologyDeployActivateDeactivateTest() {
+        super(2_000);
+    }
+
+    @Override
+    KafkaSpoutConfig<String, String> createSpoutConfig() {
+        return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+            KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+                Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
+            .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+            .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+            .build();
+    }
+
+    @Test
+    public void test_FirstPollStrategy_Earliest_NotEnforced_OnTopologyActivateDeactivate() throws Exception {
+        final int messageCount = 2;
+        prepareSpout(messageCount);
+
+        nextTuple_verifyEmitted_ack_resetCollector(0);
+
+        //Commits offsets during deactivation
+        spout.deactivate();
+
+        verifyAllMessagesCommitted(1);
+
+        consumerSpy = createConsumerSpy();
+
+        spout.activate();
+
+        nextTuple_verifyEmitted_ack_resetCollector(1);
+
+        commitAndVerifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void test_FirstPollStrategy_Earliest_NotEnforced_OnPartitionReassignment() throws Exception {
+        when(topologyContext.getStormId()).thenReturn("topology-1");
+
+        final int messageCount = 2;
+        prepareSpout(messageCount);
+
+        nextTuple_verifyEmitted_ack_resetCollector(0);
+
+        //Commits offsets during deactivation
+        spout.deactivate();
+
+        verifyAllMessagesCommitted(1);
+
+        // Restart topology with the same topology id, which mimics the behavior of partition reassignment
+        setUp();
+        // Initialize spout using the same populated data (i.e same kafkaUnitRule)
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
+
+        nextTuple_verifyEmitted_ack_resetCollector(1);
+
+        commitAndVerifyAllMessagesCommitted(messageCount);
+    }
+
+    @Test
+    public void test_FirstPollStrategy_Earliest_Enforced_OnlyOnTopologyDeployment() throws Exception {
+        when(topologyContext.getStormId()).thenReturn("topology-1");
+
+        final int messageCount = 2;
+        prepareSpout(messageCount);
+
+        nextTuple_verifyEmitted_ack_resetCollector(0);
+
+        //Commits offsets during deactivation
+        spout.deactivate();
+
+        verifyAllMessagesCommitted(1);
+
+        // Restart topology with a different topology id
+        setUp();
+        when(topologyContext.getStormId()).thenReturn("topology-2");
+        // Initialize spout using the same populated data (i.e same kafkaUnitRule)
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
+
+        //Emit all messages and check that they are emitted. Ack the messages too
+        for (int i = 0; i < messageCount; i++) {
+            nextTuple_verifyEmitted_ack_resetCollector(i);
+        }
+
+        commitAndVerifyAllMessagesCommitted(messageCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index a7ad4c2..b90a49d 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -15,7 +15,7 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.either;
 import static org.hamcrest.CoreMatchers.everyItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -34,7 +34,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.kafka.clients.producer.ProducerRecord;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.storm.kafka.KafkaUnitRule;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -46,6 +47,10 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockitoAnnotations;
 
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+
 public class MaxUncommittedOffsetTest {
 
     @Rule
@@ -59,15 +64,17 @@ public class MaxUncommittedOffsetTest {
     private final int maxUncommittedOffsets = 10;
     private final int maxPollRecords = 5;
     private final int initialRetryDelaySecs = 60;
-    private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+    private final KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
         .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
-        .setMaxPollRecords(maxPollRecords)
+        .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
         .setMaxUncommittedOffsets(maxUncommittedOffsets)
         .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
             1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
         .build();
     private KafkaSpout<String, String> spout;
 
+
+
     @Before
     public void setUp() {
         //This is because the tests are checking that a hard cap of maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets exists
@@ -77,37 +84,24 @@ public class MaxUncommittedOffsetTest {
         //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets.
         assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets));
         MockitoAnnotations.initMocks(this);
-        this.spout = new KafkaSpout<>(spoutConfig);
+        spout = new KafkaSpout<>(spoutConfig);
     }
 
-    private void populateTopicData(String topicName, int msgCount) throws Exception {
-        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
-
-        for (int i = 0; i < msgCount; i++) {
-            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
-                topicName, Integer.toString(i),
-                Integer.toString(i));
-
-            kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
-        }
-    }
-
-    private void initializeSpout(int msgCount) throws Exception {
-        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
-        spout.open(conf, topologyContext, collector);
-        spout.activate();
+    private void prepareSpout(int msgCount) throws Exception {
+        SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
     }
 
     private ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) throws Exception {
         assertThat("The message count is less than maxUncommittedOffsets. This test is not meaningful with this configuration.", messageCount, greaterThanOrEqualTo(maxUncommittedOffsets));
         //The spout must respect maxUncommittedOffsets when requesting/emitting tuples
-        initializeSpout(messageCount);
+        prepareSpout(messageCount);
 
         //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted
         ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
         for (int i = 0; i < messageCount; i++) {
             spout.nextTuple();
-        };
+        }
         verify(collector, times(maxUncommittedOffsets)).emit(
             anyString(),
             anyList(),
@@ -128,6 +122,7 @@ public class MaxUncommittedOffsetTest {
                 spout.ack(messageId);
             }
             Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+
             spout.nextTuple();
 
             //Now check that the spout will emit another maxUncommittedOffsets messages
@@ -183,8 +178,60 @@ public class MaxUncommittedOffsetTest {
 
     @Test
     public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollRecordsMessages() throws Exception {
-        //The upper bound on uncommitted offsets should be maxUncommittedOffsets + maxPollRecords - 1
-        //This is reachable by emitting maxUncommittedOffsets messages, acking the first message, then polling.
+        /*
+        For each partition the spout is allowed to retry all tuples between the committed offset, and maxUncommittedOffsets ahead.
+        It is not allowed to retry tuples past that limit.
+        This makes the actual limit per partition maxUncommittedOffsets + maxPollRecords - 1,
+        reached if the tuple at the maxUncommittedOffsets limit is the earliest retriable tuple,
+        or if the spout is 1 tuple below the limit, and receives a full maxPollRecords tuples in the poll.
+         */
+
+        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+            //First check that maxUncommittedOffsets is respected when emitting from scratch
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+            reset(collector);
+
+            //Fail only the last tuple
+            List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
+            KafkaSpoutMessageId failedMessageId = messageIdList.get(messageIdList.size() - 1);
+            spout.fail(failedMessageId);
+
+            //Offset 0 to maxUncommittedOffsets - 2 are pending, maxUncommittedOffsets - 1 is failed but not retriable
+            //The spout should not emit any more tuples.
+            spout.nextTuple();
+            verify(collector, never()).emit(
+                anyString(),
+                anyList(),
+                any(KafkaSpoutMessageId.class));
+
+            //Allow the failed record to retry
+            Time.advanceTimeSecs(initialRetryDelaySecs);
+            for (int i = 0; i < maxPollRecords; i++) {
+                spout.nextTuple();
+            }
+            ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collector, times(maxPollRecords)).emit(
+                anyString(),
+                anyList(),
+                secondRunMessageIds.capture());
+            reset(collector);
+            assertThat(secondRunMessageIds.getAllValues().get(0), is(failedMessageId));
+            
+            //There should now be maxUncommittedOffsets + maxPollRecords emitted in all.
+            //Fail the last emitted tuple and verify that the spout won't retry it because it's above the emit limit.
+            spout.fail(secondRunMessageIds.getAllValues().get(secondRunMessageIds.getAllValues().size() - 1));
+            Time.advanceTimeSecs(initialRetryDelaySecs);
+            spout.nextTuple();
+            verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+        }
+    }
+
+    @Test
+    public void testNextTupleWillAllowRetryForTuplesBelowEmitLimit() throws Exception {
+        /*
+        For each partition the spout is allowed to retry all tuples between the committed offset, and maxUncommittedOffsets ahead.
+        It must retry tuples within that limit, even if more tuples were emitted.
+         */
         try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
             //First check that maxUncommittedOffsets is respected when emitting from scratch
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
@@ -192,9 +239,9 @@ public class MaxUncommittedOffsetTest {
 
             failAllExceptTheFirstMessageThenCommit(messageIds);
 
-            //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed
+            //Offset 0 is committed, 1 to maxUncommittedOffsets - 1 are failed but not retriable
             //The spout should now emit another maxPollRecords messages
-            //This is allowed because the acked message brings the numUncommittedOffsets below the cap
+            //This is allowed because the committed message brings the numUncommittedOffsets below the cap
             for (int i = 0; i < maxUncommittedOffsets; i++) {
                 spout.nextTuple();
             }
@@ -216,18 +263,20 @@ public class MaxUncommittedOffsetTest {
             }
             assertThat("Expected the newly emitted messages to have no overlap with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false));
 
-            //Offset 0 is acked, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
-            //There are now maxUncommittedOffsets-1 + maxPollRecords records emitted past the last committed offset
-            //Advance time so the failed tuples become ready for retry, and check that the spout will emit retriable tuples as long as numNonRetriableEmittedTuples < maxUncommittedOffsets
-            
-            int numNonRetriableEmittedTuples = maxPollRecords; //The other tuples were failed and are becoming retriable
-            int allowedPolls = (int)Math.ceil((maxUncommittedOffsets - numNonRetriableEmittedTuples)/(double)maxPollRecords);
+            //Offset 0 is committed, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
+            //Fail the last tuples so only offset 0 is not failed.
+            //Advance time so the failed tuples become ready for retry, and check that the spout will emit retriable tuples
+            //for all the failed tuples that are within maxUncommittedOffsets tuples of the committed offset
+            //This means 1 to maxUncommitteddOffsets, but not maxUncommittedOffsets+1...maxUncommittedOffsets+maxPollRecords-1
+            for(KafkaSpoutMessageId msgId : secondRunMessageIds.getAllValues()) {
+                spout.fail(msgId);
+            }
             Time.advanceTimeSecs(initialRetryDelaySecs);
             for (int i = 0; i < numMessages; i++) {
                 spout.nextTuple();
             }
             ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collector, times(allowedPolls*maxPollRecords)).emit(
+            verify(collector, times(maxUncommittedOffsets)).emit(
                 anyString(),
                 anyList(),
                 thirdRunMessageIds.capture());
@@ -238,8 +287,7 @@ public class MaxUncommittedOffsetTest {
                 thirdRunOffsets.add(msgId.offset());
             }
 
-            assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch", thirdRunOffsets, everyItem(isIn(firstRunOffsets)));
+            assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch, plus the first failed tuple from the second batch", thirdRunOffsets, everyItem(either(isIn(firstRunOffsets)).or(is(secondRunMessageIds.getAllValues().get(0).offset()))));
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
index e97c7e1..fe3325c 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
@@ -16,10 +16,11 @@
 
 package org.apache.storm.kafka.spout;
 
+import org.apache.storm.kafka.spout.NamedTopicFilter;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
-
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
index 877efdc..335ab31 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
@@ -16,6 +16,8 @@
 
 package org.apache.storm.kafka.spout;
 
+import org.apache.storm.kafka.spout.PatternTopicFilter;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
deleted file mode 100644
index 436d052..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
-
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.storm.kafka.KafkaUnitRule;
-import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Values;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Time.SimulatedTime;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.mockito.Captor;
-import org.mockito.MockitoAnnotations;
-
-public class SingleTopicKafkaSpoutTest {
-
-    @Rule
-    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
-
-    @Captor
-    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
-
-    private final TopologyContext topologyContext = mock(TopologyContext.class);
-    private final Map<String, Object> conf = new HashMap<>();
-    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
-    private final long commitOffsetPeriodMs = 2_000;
-    private KafkaConsumer<String, String> consumerSpy;
-    private KafkaConsumerFactory<String, String> consumerFactory;
-    private KafkaSpout<String, String> spout;
-    private final int maxPollRecords = 10;
-    private final int maxRetries = 3;
-
-    @Before
-    public void setUp() {
-        MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
-            .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
-            .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
-                maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
-            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
-            .build();
-        this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
-        this.consumerFactory = new KafkaConsumerFactory<String, String>() {
-            @Override
-            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
-                return consumerSpy;
-            }
-        
-        };
-        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-    }
-
-    void populateTopicData(String topicName, int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
-        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
-
-        for (int i = 0; i < msgCount; i++) {
-            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
-                topicName, Integer.toString(i),
-                Integer.toString(i));
-            kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
-        }
-    }
-
-    private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
-        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
-        spout.open(conf, topologyContext, collector);
-        spout.activate();
-    }
-
-    /*
-     * Asserts that commitSync has been called once, 
-     * that there are only commits on one topic,
-     * and that the committed offset covers messageCount messages
-     */
-    private void verifyAllMessagesCommitted(long messageCount) {
-        verify(consumerSpy, times(1)).commitSync(commitCapture.capture());
-        Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
-        assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1));
-        OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue();
-        assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount));
-    }
-
-    @Test
-    public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = maxPollRecords * 2;
-            initializeSpout(messageCount);
-
-            //Emit all messages and fail the first one while acking the rest
-            for (int i = 0; i < messageCount; i++) {
-                spout.nextTuple();
-            }
-            ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture());
-            List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues();
-            for (int i = 1; i < messageIds.size(); i++) {
-                spout.ack(messageIds.get(i));
-            }
-            KafkaSpoutMessageId failedTuple = messageIds.get(0);
-            spout.fail(failedTuple);
-
-            //Advance the time and replay the failed tuple. 
-            reset(collector);
-            spout.nextTuple();
-            ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collector).emit(anyString(), anyList(), failedIdReplayCaptor.capture());
-
-            assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple));
-
-            /* Ack the tuple, and commit.
-             * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll.
-             */
-            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
-            spout.ack(failedIdReplayCaptor.getValue());
-            spout.nextTuple();
-            verify(consumerSpy).commitSync(commitCapture.capture());
-            
-            Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue();
-            TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
-            assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp));
-            assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount));
-
-            /* Verify that the following acked (now committed) tuples are not emitted again
-             * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened,
-             * this verifies that the spout keeps the consumer position ahead of the committed offset when committing
-             */
-            reset(collector);
-            //Just do a few polls to check that nothing more is emitted
-            for(int i = 0; i < 3; i++) {
-                spout.nextTuple();
-            }
-            verify(collector, never()).emit(anyString(), anyList(), anyObject());
-        }
-    }
-
-    @Test
-    public void shouldContinueWithSlowDoubleAcks() throws Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = 20;
-            initializeSpout(messageCount);
-
-            //play 1st tuple
-            ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture());
-            spout.ack(messageIdToDoubleAck.getValue());
-
-            //Emit some more messages
-            for(int i = 0; i < messageCount / 2; i++) {
-                spout.nextTuple();
-            }
-
-            spout.ack(messageIdToDoubleAck.getValue());
-
-            //Emit any remaining messages
-            for(int i = 0; i < messageCount; i++) {
-                spout.nextTuple();
-            }
-
-            //Verify that all messages are emitted, ack all the messages
-            ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class);
-            verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyList(),
-                messageIds.capture());
-            for(Object id : messageIds.getAllValues()) {
-                spout.ack(id);
-            }
-
-            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
-            //Commit offsets
-            spout.nextTuple();
-
-            verifyAllMessagesCommitted(messageCount);
-        }
-    }
-
-    @Test
-    public void shouldEmitAllMessages() throws Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = 10;
-            initializeSpout(messageCount);
-
-            //Emit all messages and check that they are emitted. Ack the messages too
-            for(int i = 0; i < messageCount; i++) {
-                spout.nextTuple();
-                ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
-                verify(collector).emit(
-                    eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                    eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
-                        Integer.toString(i),
-                        Integer.toString(i))),
-                    messageId.capture());
-                spout.ack(messageId.getValue());
-                reset(collector);
-            }
-
-            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
-            //Commit offsets
-            spout.nextTuple();
-
-            verifyAllMessagesCommitted(messageCount);
-        }
-    }
-
-    @Test
-    public void shouldReplayInOrderFailedMessages() throws Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = 10;
-            initializeSpout(messageCount);
-
-            //play and ack 1 tuple
-            ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), messageIdAcked.capture());
-            spout.ack(messageIdAcked.getValue());
-            reset(collector);
-
-            //play and fail 1 tuple
-            ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), messageIdFailed.capture());
-            spout.fail(messageIdFailed.getValue());
-            reset(collector);
-
-            //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
-            for(int i = 0; i < messageCount; i++) {
-                spout.nextTuple();
-            }
-
-            ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
-            //All messages except the first acked message should have been emitted
-            verify(collector, times(messageCount - 1)).emit(
-                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyList(),
-                remainingMessageIds.capture());
-            for(Object id : remainingMessageIds.getAllValues()) {
-                spout.ack(id);
-            }
-
-            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
-            //Commit offsets
-            spout.nextTuple();
-
-            verifyAllMessagesCommitted(messageCount);
-        }
-    }
-
-    @Test
-    public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
-        try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            int messageCount = 10;
-            initializeSpout(messageCount);
-
-            //play 1st tuple
-            ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), messageIdToFail.capture());
-            reset(collector);
-
-            //play 2nd tuple
-            ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyList(), messageIdToAck.capture());
-            reset(collector);
-
-            //ack 2nd tuple
-            spout.ack(messageIdToAck.getValue());
-            //fail 1st tuple
-            spout.fail(messageIdToFail.getValue());
-
-            //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
-            for(int i = 0; i < messageCount; i++) {
-                spout.nextTuple();
-            }
-
-            ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
-            //All messages except the first acked message should have been emitted
-            verify(collector, times(messageCount - 1)).emit(
-                eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyList(),
-                remainingIds.capture());
-            for(Object id : remainingIds.getAllValues()) {
-                spout.ack(id);
-            }
-
-            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
-            //Commit offsets
-            spout.nextTuple();
-
-            verifyAllMessagesCommitted(messageCount);
-        }
-    }
-
-    @Test
-    public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception {
-        //The spout must reemit retriable tuples, even if they fail out of order.
-        //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries.
-        int messageCount = 10;
-        initializeSpout(messageCount);
-
-        //play all tuples
-        for (int i = 0; i < messageCount; i++) {
-            spout.nextTuple();
-        }
-        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIds.capture());
-        reset(collector);
-        //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
-        List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues();
-        spout.fail(capturedMessageIds.get(5));
-        spout.fail(capturedMessageIds.get(3));
-        spout.nextTuple();
-        spout.fail(capturedMessageIds.get(2));
-
-        //Check that the spout will reemit all 3 failed tuples and no other tuples
-        ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        for (int i = 0; i < messageCount; i++) {
-            spout.nextTuple();
-        }
-        verify(collector, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture());
-        Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>();
-        expectedReemitIds.add(capturedMessageIds.get(5));
-        expectedReemitIds.add(capturedMessageIds.get(3));
-        expectedReemitIds.add(capturedMessageIds.get(2));
-        assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
-    }
-
-    @Test
-    public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
-        //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
-        int messageCount = 1;
-        initializeSpout(messageCount);
-
-        //Emit and fail the same tuple until we've reached retry limit
-        for (int i = 0; i <= maxRetries; i++) {
-            ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            spout.nextTuple();
-            verify(collector).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture());
-            KafkaSpoutMessageId msgId = messageIdFailed.getValue();
-            spout.fail(msgId);
-            assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1));
-            reset(collector);
-        }
-
-        //Verify that the tuple is not emitted again
-        spout.nextTuple();
-        verify(collector, never()).emit(anyString(), anyListOf(Object.class), anyObject());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
new file mode 100644
index 0000000..f5b9423
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnit;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.mockito.ArgumentCaptor;
+
+public class SingleTopicKafkaUnitSetupHelper {
+
+    /**
+     * Using the given KafkaUnit instance, put some messages in the specified topic.
+     *
+     * @param kafkaUnit The KafkaUnit instance to use
+     * @param topicName The topic to produce messages for
+     * @param msgCount The number of messages to produce
+     */
+    public static void populateTopicData(KafkaUnit kafkaUnit, String topicName, int msgCount) throws Exception {
+        kafkaUnit.createTopic(topicName);
+        
+        for (int i = 0; i < msgCount; i++) {
+            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
+                topicName, Integer.toString(i),
+                Integer.toString(i));
+            kafkaUnit.sendMessage(producerRecord);
+        }
+    }
+
+    /*
+     * Asserts that commitSync has been called once, 
+     * that there are only commits on one topic,
+     * and that the committed offset covers messageCount messages
+     */
+    public static <K, V> void verifyAllMessagesCommitted(KafkaConsumer<K, V> consumerSpy,
+        ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture, long messageCount) {
+        verify(consumerSpy, times(1)).commitSync(commitCapture.capture());
+        Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+        assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1));
+        OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue();
+        assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount));
+    }
+
+    /**
+     * Open and activate a KafkaSpout that acts as a single-task/executor spout.
+     *
+     * @param <K> Kafka key type
+     * @param <V> Kafka value type
+     * @param spout The spout to prepare
+     * @param topoConf The topoConf
+     * @param topoContextMock The TopologyContext mock
+     * @param collectorMock The output collector mock
+     */
+    public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock,
+        SpoutOutputCollector collectorMock) throws Exception {
+        when(topoContextMock.getThisTaskIndex()).thenReturn(0);
+        when(topoContextMock.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0));
+        spout.open(topoConf, topoContextMock, collectorMock);
+        spout.activate();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
new file mode 100644
index 0000000..3aad61e
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mockingDetails;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class SpoutWithMockedConsumerSetupHelper {
+
+    /**
+     * Creates, opens and activates a KafkaSpout using a mocked consumer. The subscription should be a mock object, since this method skips
+     * the subscription and instead just configures the mocked consumer to act as if the specified partitions are assigned to it.
+     *
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spoutConfig The spout config to use
+     * @param topoConf The topo conf to pass to the spout
+     * @param contextMock The topo context to pass to the spout
+     * @param collectorMock The mocked collector to pass to the spout
+     * @param consumerMock The mocked consumer
+     * @param assignedPartitions The partitions to assign to this spout. The consumer will act like these partitions are assigned to it.
+     * @return The spout
+     */
+    public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf,
+        TopologyContext contextMock, SpoutOutputCollector collectorMock, final KafkaConsumer<K, V> consumerMock, TopicPartition... assignedPartitions) {
+        Subscription subscriptionMock = spoutConfig.getSubscription();
+        if (!mockingDetails(subscriptionMock).isMock()) {
+            throw new IllegalStateException("Use a mocked subscription when using this method, it helps avoid complex stubbing");
+        }
+
+        final Set<TopicPartition> assignedPartitionsSet = new HashSet<>(Arrays.asList(assignedPartitions));
+
+        when(consumerMock.assignment()).thenReturn(assignedPartitionsSet);
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                ConsumerRebalanceListener listener = (ConsumerRebalanceListener) invocation.getArguments()[1];
+                listener.onPartitionsAssigned(assignedPartitionsSet);
+                return null;
+            }
+
+        }).when(subscriptionMock).subscribe(any(KafkaConsumer.class), any(ConsumerRebalanceListener.class), any(TopologyContext.class));
+
+        KafkaConsumerFactory<K, V> consumerFactory = new KafkaConsumerFactory<K, V>() {
+            @Override
+            public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+                return consumerMock;
+            }
+        };
+        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+        spout.open(topoConf, contextMock, collectorMock);
+        spout.activate();
+
+        return spout;
+    }
+
+    /**
+     * Creates sequential dummy records
+     *
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param topic The topic partition to create records for
+     * @param startingOffset The starting offset of the records
+     * @param numRecords The number of records to create
+     * @return The dummy records
+     */
+    public static <K, V> List<ConsumerRecord<K, V>> createRecords(TopicPartition topic, long startingOffset, int numRecords) {
+        List<ConsumerRecord<K, V>> recordsForPartition = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            recordsForPartition.add(new ConsumerRecord<K, V>(topic.topic(), topic.partition(), startingOffset + i, null, null));
+        }
+        return recordsForPartition;
+    }
+
+    /**
+     * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids
+     *
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spout The spout
+     * @param consumerMock The consumer used by the spout
+     * @param expectedEmits The number of expected emits
+     * @param collectorMock The collector used by the spout
+     * @param partition The partition to emit messages on
+     * @param offsetsToEmit The offsets to emit
+     * @return The message ids emitted by the spout during the nextTuple calls
+     */
+    public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, TopicPartition partition, int... offsetsToEmit) {
+        return pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, Collections.singletonMap(partition, offsetsToEmit));
+    }
+
+    /**
+     * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids
+     *
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spout The spout
+     * @param consumerMock The consumer used by the spout
+     * @param collectorMock The collector used by the spout
+     * @param offsetsToEmit The offsets to emit per partition
+     * @return The message ids emitted by the spout during the nextTuple calls
+     */
+    public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, Map<TopicPartition, int[]> offsetsToEmit) {
+        int totalOffsets = 0;
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+        for (Entry<TopicPartition, int[]> entry : offsetsToEmit.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            List<ConsumerRecord<K, V>> tpRecords = new ArrayList<>();
+            for (Integer offset : entry.getValue()) {
+                tpRecords.add(new ConsumerRecord<K, V>(tp.topic(), tp.partition(), offset, null, null));
+                totalOffsets++;
+            }
+            records.put(tp, tpRecords);
+        }
+
+        when(consumerMock.poll(anyLong()))
+            .thenReturn(new ConsumerRecords<>(records));
+
+        for (int i = 0; i < totalOffsets; i++) {
+            spout.nextTuple();
+        }
+
+        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock, times(expectedEmits)).emit(anyString(), anyList(), messageIds.capture());
+        return messageIds.getAllValues();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index d5c052b..b178687 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -17,17 +17,20 @@
  */
 package org.apache.storm.kafka.spout.builders;
 
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
 import java.util.List;
-
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.kafka.spout.Func;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.Subscription;
 import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
@@ -46,7 +49,7 @@ public class SingleTopicKafkaSpoutConfiguration {
 
     public static StormTopology getTopologyKafkaSpout(int port) {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1);
+        tp.setSpout("kafka_spout", new KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(port).build()), 1);
         tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
         return tp.createTopology();
     }
@@ -57,21 +60,33 @@ public class SingleTopicKafkaSpoutConfiguration {
             return new Values(r.topic(), r.key(), r.value());
         }
     };
-    
-    public static KafkaSpoutConfig.Builder<String,String> getKafkaSpoutConfigBuilder(int port) {
-        return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
-                .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
-                        new Fields("topic", "key", "value"), STREAM)
-                .setGroupId("kafkaSpoutTestGroup")
-                .setMaxPollRecords(5)
-                .setRetry(getRetryService())
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .setPollTimeoutMs(1000);
+
+    public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(int port) {
+        return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC));
+    }
+
+    public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(Subscription subscription, int port) {
+        return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<String, String>("127.0.0.1:" + port, subscription));
     }
-        
-    protected static KafkaSpoutRetryService getRetryService() {
-        return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE;
+
+    public static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
+        return config
+            .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
+                new Fields("topic", "key", "value"), STREAM)
+            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+            .setRetry(getNoDelayRetryService())
+            .setOffsetCommitPeriodMs(10_000)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setMaxUncommittedOffsets(250)
+            .setPollTimeoutMs(1000);
+    }
+
+    protected static KafkaSpoutRetryService getNoDelayRetryService() {
+        /**
+         * Retry in a tight loop (keep unit tests fasts).
+         */
+        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
index abbacf9..9972d4c 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
@@ -18,10 +18,12 @@ package org.apache.storm.kafka.spout.internal;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.util.NoSuchElementException;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
@@ -30,6 +32,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 public class OffsetManagerTest {
+    private static final String COMMIT_METADATA = "{\"topologyId\":\"tp1\",\"taskId\":3,\"threadName\":\"Thread-20\"}";
 
     @Rule
     public ExpectedException expect = ExpectedException.none();
@@ -56,12 +59,12 @@ public class OffsetManagerTest {
         manager.addToAckMsgs(getMessageId(initialFetchOffset + 2));
         manager.addToAckMsgs(getMessageId(initialFetchOffset + 6));
         
-        assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(initialFetchOffset + 3));
+        assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(COMMIT_METADATA).offset(), is(initialFetchOffset + 3));
         
         manager.addToAckMsgs(getMessageId(initialFetchOffset + 5));
         
-        assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", 
-            manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7));
+        assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted",
+            manager.findNextCommitOffset(COMMIT_METADATA), is(new OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA)));
     }
     
     @Test
@@ -71,17 +74,17 @@ public class OffsetManagerTest {
         manager.addToEmitMsgs(initialFetchOffset + 6);
         manager.addToAckMsgs(getMessageId(initialFetchOffset + 6));
         
-        assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(), is(nullValue()));
+        assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(COMMIT_METADATA), is(nullValue()));
         
         manager.addToAckMsgs(getMessageId(initialFetchOffset + 5));
         
         assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted", 
-            manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7));
+            manager.findNextCommitOffset(COMMIT_METADATA), is(new OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA)));
     }
 
     @Test
     public void testFindNextCommittedOffsetWithNoAcks() {
-        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
         assertThat("There shouldn't be a next commit offset when nothing has been acked", nextCommitOffset, is(nullValue()));
     }
 
@@ -92,7 +95,7 @@ public class OffsetManagerTest {
          * lastProcessedMessageOffset + 1. "
          */
         emitAndAckMessage(getMessageId(initialFetchOffset));
-        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
         assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1));
     }
 
@@ -100,7 +103,7 @@ public class OffsetManagerTest {
     public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() {
         emitAndAckMessage(getMessageId(initialFetchOffset + 1));
         emitAndAckMessage(getMessageId(initialFetchOffset));
-        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
         assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2));
     }
 
@@ -109,7 +112,7 @@ public class OffsetManagerTest {
         emitAndAckMessage(getMessageId(initialFetchOffset + 2));
         manager.addToEmitMsgs(initialFetchOffset + 1);
         emitAndAckMessage(getMessageId(initialFetchOffset));
-        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
         assertThat("The next commit offset should cover the sequential acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
     }
 
@@ -123,7 +126,7 @@ public class OffsetManagerTest {
          */
         emitAndAckMessage(getMessageId(initialFetchOffset + 2));
         emitAndAckMessage(getMessageId(initialFetchOffset));
-        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
         assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist",
             nextCommitOffset.offset(), is(initialFetchOffset + 3));
     }
@@ -132,7 +135,7 @@ public class OffsetManagerTest {
     public void testFindNextCommitOffsetWithUnackedOffsetGap() {
         manager.addToEmitMsgs(initialFetchOffset + 1);
         emitAndAckMessage(getMessageId(initialFetchOffset));
-        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+        OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
         assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
     }
     
@@ -140,7 +143,7 @@ public class OffsetManagerTest {
     public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() {
         OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10);
         emitAndAckMessage(getMessageId(0));
-        OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset();
+        OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset(COMMIT_METADATA);
         assertThat("Acking an offset earlier than the committed offset should have no effect", nextCommitOffset, is(nullValue()));
     }
     
@@ -170,4 +173,24 @@ public class OffsetManagerTest {
         manager.addToAckMsgs(msgId);
     }
 
+    @Test
+    public void testGetNthUncommittedOffsetAfterCommittedOffset() { 
+        manager.addToEmitMsgs(initialFetchOffset + 1);
+        manager.addToEmitMsgs(initialFetchOffset + 2);
+        manager.addToEmitMsgs(initialFetchOffset + 5);
+        manager.addToEmitMsgs(initialFetchOffset + 30);
+        
+        assertThat("The third uncommitted offset should be 5", manager.getNthUncommittedOffsetAfterCommittedOffset(3), is(initialFetchOffset + 5L));
+        assertThat("The fourth uncommitted offset should be 30", manager.getNthUncommittedOffsetAfterCommittedOffset(4), is(initialFetchOffset + 30L));
+        
+        expect.expect(NoSuchElementException.class);
+        manager.getNthUncommittedOffsetAfterCommittedOffset(5);
+    }
+
+    @Test
+    public void testCommittedFlagSetOnCommit() throws Exception {
+        assertFalse(manager.hasCommitted());
+        manager.commit(mock(OffsetAndMetadata.class));
+        assertTrue(manager.hasCommitted());
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
new file mode 100644
index 0000000..9a2a682
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.ManualPartitionSubscription;
+import org.apache.storm.kafka.spout.ManualPartitioner;
+import org.apache.storm.kafka.spout.TopicFilter;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class ManualPartitionSubscriptionTest {
+
+    @Test
+    public void testCanReassignPartitions() {
+        ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
+        TopicFilter filterMock = mock(TopicFilter.class);
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class);
+        TopologyContext contextMock = mock(TopologyContext.class);
+        ManualPartitionSubscription subscription = new ManualPartitionSubscription(partitionerMock, filterMock);
+        
+        List<TopicPartition> onePartition = Collections.singletonList(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        List<TopicPartition> twoPartitions = new ArrayList<>();
+        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1));
+        when(partitionerMock.partition(anyList(), any(TopologyContext.class)))
+            .thenReturn(onePartition)
+            .thenReturn(twoPartitions);
+        
+        //Set the first assignment
+        subscription.subscribe(consumerMock, listenerMock, contextMock);
+        
+        InOrder inOrder = inOrder(consumerMock, listenerMock);
+        inOrder.verify(consumerMock).assign(new HashSet<>(onePartition));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition));
+        
+        reset(consumerMock, listenerMock);
+        
+        when(consumerMock.assignment()).thenReturn(new HashSet<>(onePartition));
+        
+        //Update to set the second assignment
+        subscription.refreshAssignment();
+        
+        //The partition revocation hook must be called before the new partitions are assigned to the consumer,
+        //to allow the revocation hook to commit offsets for the revoked partitions.
+        inOrder.verify(listenerMock).onPartitionsRevoked(new HashSet<>(onePartition));
+        inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(twoPartitions));
+    }
+    
+}