You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/07 21:05:25 UTC

[3/7] storm git commit: STORM-2936 Overwrite latest storm-kafka-client 1.x-branch into 1.1.x-branch

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 447f8c4..dbba04b 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -15,9 +15,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
@@ -26,35 +23,36 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.never;
 
-import java.util.HashSet;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.mockito.InOrder;
 
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.eq;
+
 public class KafkaSpoutEmitTest {
 
     private final long offsetCommitPeriodMs = 2_000;
@@ -63,50 +61,26 @@ public class KafkaSpoutEmitTest {
     private final Map<String, Object> conf = new HashMap<>();
     private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpout<String, String> spout;
-    private KafkaSpoutConfig spoutConfig;
+    private KafkaSpoutConfig<String, String> spoutConfig;
 
-    private void setupSpout(Set<TopicPartition> assignedPartitions) {
-        spoutConfig = getKafkaSpoutConfigBuilder(-1)
+    @Before
+    public void setUp() {
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .build();
-
         consumerMock = mock(KafkaConsumer.class);
-        KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() {
-            @Override
-            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
-                return consumerMock;
-            }
-        };
-
-        //Set up a spout listening to 1 topic partition
-        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
-        spout.open(conf, contextMock, collectorMock);
-        spout.activate();
-
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
-        //Assign partitions to the spout
-        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
-        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
     }
 
     @Test
     public void testNextTupleEmitsAtMostOneTuple() {
         //The spout should emit at most one message per call to nextTuple
         //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
-        setupSpout(Collections.singleton(partition));
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
         Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
-        List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
-        }
-        records.put(partition, recordsForPartition);
+        records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 10));
 
         when(consumerMock.poll(anyLong()))
-            .thenReturn(new ConsumerRecords(records));
+            .thenReturn(new ConsumerRecords<>(records));
 
         spout.nextTuple();
 
@@ -114,29 +88,26 @@ public class KafkaSpoutEmitTest {
     }
 
     @Test
-    public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() {
+    public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() throws IOException {
         //The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded
-        
+
         //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpout(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
-            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
-            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
-                //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
-            }
-            records.put(partition, recordsForPartition);
+            int numRecords = spoutConfig.getMaxUncommittedOffsets();
+            //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+            records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, numRecords));
 
             when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords(records));
+                .thenReturn(new ConsumerRecords<>(records));
 
-            for (int i = 0; i < recordsForPartition.size(); i++) {
+            for (int i = 0; i < numRecords; i++) {
                 spout.nextTuple();
             }
 
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+            verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture());
 
             for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
                 spout.fail(messageId);
@@ -146,16 +117,16 @@ public class KafkaSpoutEmitTest {
 
             Time.advanceTime(50);
             //No backoff for test retry service, just check that messages will retry immediately
-            for (int i = 0; i < recordsForPartition.size(); i++) {
+            for (int i = 0; i < numRecords; i++) {
                 spout.nextTuple();
             }
 
             ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), retryMessageIds.capture());
+            verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), retryMessageIds.capture());
 
             //Verify that the poll started at the earliest retriable tuple offset
             List<Long> failedOffsets = new ArrayList<>();
-            for(KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+            for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
                 failedOffsets.add(msgId.offset());
             }
             InOrder inOrder = inOrder(consumerMock);
@@ -163,93 +134,80 @@ public class KafkaSpoutEmitTest {
             inOrder.verify(consumerMock).poll(anyLong());
         }
     }
-    
+
     @Test
-    public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples() {
-        /*
-        The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded.
-        numUncommittedOffsets is equal to numNonRetriableEmittedTuples + numRetriableTuples.
-        The spout will only emit if numUncommittedOffsets - numRetriableTuples < maxUncommittedOffsets (i.e. numNonRetriableEmittedTuples < maxUncommittedOffsets)
-        This means that the latest offset a poll can start at for a retriable partition,
-        counting from the last committed offset, is maxUncommittedOffsets,
-        where there are maxUncommittedOffsets - 1 uncommitted tuples "to the left".
-        If the retry poll starts at that offset, it at most emits the retried tuple plus maxPollRecords - 1 new tuples.
-        The limit on uncommitted offsets for one partition is therefore maxUncommittedOffsets + maxPollRecords - 1.
-        
-        It is only necessary to test this for a single partition, because partitions can't contribute negatively to numNonRetriableEmittedTuples,
-        so if the limit holds for one partition, it will also hold for each individual partition when multiple are involved.
-        
-        This makes the actual limit numPartitions * (maxUncommittedOffsets + maxPollRecords - 1)
-         */
-        
-        //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying
+    public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
+        //This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit.
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpout(Collections.singleton(partition));
-            
-            Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPollRecords = new HashMap<>();
-            List<ConsumerRecord<String, String>> firstPollRecordsForPartition = new ArrayList<>();
-            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
-                //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
-                firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
-            }
-            firstPollRecords.put(partition, firstPollRecordsForPartition);
-            
-            int maxPollRecords = 5;
-            Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPollRecords = new HashMap<>();
-            List<ConsumerRecord<String, String>> secondPollRecordsForPartition = new ArrayList<>();
-            for(int i = 0; i < maxPollRecords; i++) {
-                secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
-            }
-            secondPollRecords.put(partition, secondPollRecordsForPartition);
+            TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo);
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+            //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+            records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()));
+            records.put(partitionTwo, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionTwo, 0, spoutConfig.getMaxUncommittedOffsets() + 1));
+            int numMessages = spoutConfig.getMaxUncommittedOffsets()*2 + 1;
 
             when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords(firstPollRecords))
-                .thenReturn(new ConsumerRecords(secondPollRecords));
+                .thenReturn(new ConsumerRecords<>(records));
 
-            for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) {
+            for (int i = 0; i < numMessages; i++) {
                 spout.nextTuple();
             }
 
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, times(firstPollRecordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
-
-            KafkaSpoutMessageId failedMessageId = messageIds.getAllValues().get(messageIds.getAllValues().size() - 1);
-            spout.fail(failedMessageId);
-
-            reset(collectorMock);
-
-            //Now make the single failed tuple retriable
-            Time.advanceTime(50);
-            //The spout should allow another poll since there are now only maxUncommittedOffsets - 1 nonretriable tuples
-            for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) {
-                spout.nextTuple();
+            verify(collectorMock, times(numMessages)).emit(anyString(), anyList(), messageIds.capture());
+            
+            //Now fail a tuple on partition one and verify that it is allowed to retry, because the failed tuple is below the maxUncommittedOffsets limit
+            KafkaSpoutMessageId failedMessageIdPartitionOne = null;
+            for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+                if (msgId.partition() == partition.partition()) {
+                    failedMessageIdPartitionOne = msgId;
+                    break;
+                }
+            }
+            
+            spout.fail(failedMessageIdPartitionOne);
+
+            //Also fail the last tuple from partition two. Since the failed tuple is beyond the maxUncommittedOffsets limit, it should not be retried until earlier messages are acked.
+            KafkaSpoutMessageId failedMessageIdPartitionTwo = null;
+            for (KafkaSpoutMessageId msgId: messageIds.getAllValues()) {
+                if (msgId.partition() == partitionTwo.partition()) {
+                    if (failedMessageIdPartitionTwo != null) {
+                        if (msgId.offset() >= failedMessageIdPartitionTwo.offset()) {
+                            failedMessageIdPartitionTwo = msgId;
+                        }
+                    } else {
+                        failedMessageIdPartitionTwo = msgId;
+                    }
+                }
             }
 
-            ArgumentCaptor<KafkaSpoutMessageId> retryBatchMessageIdsCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, times(maxPollRecords)).emit(anyString(), anyList(), retryBatchMessageIdsCaptor.capture());
+            spout.fail(failedMessageIdPartitionTwo);
+            
             reset(collectorMock);
             
-            //Check that the consumer started polling at the failed tuple offset
+            Time.advanceTime(50);
+            when(consumerMock.poll(anyLong()))
+                .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, failedMessageIdPartitionOne.offset(), 1))));
+            
+            spout.nextTuple();
+            
+            verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+            
             InOrder inOrder = inOrder(consumerMock);
-            inOrder.verify(consumerMock).seek(partition, failedMessageId.offset());
+            inOrder.verify(consumerMock).seek(partition, failedMessageIdPartitionOne.offset());
+            //Should not seek on the paused partition
+            inOrder.verify(consumerMock, never()).seek(eq(partitionTwo), anyLong());
+            inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo));
             inOrder.verify(consumerMock).poll(anyLong());
+            inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo));
             
-            //Now fail all except one of the last batch, and check that the spout won't reemit any tuples because there are more than maxUncommittedOffsets nonretriable tuples
-            Time.advanceTime(50);
-            List<KafkaSpoutMessageId> retryBatchMessageIds = retryBatchMessageIdsCaptor.getAllValues();
-            KafkaSpoutMessageId firstTupleFromRetryBatch = retryBatchMessageIds.remove(0);
-            for(KafkaSpoutMessageId msgId : retryBatchMessageIds) {
-                spout.fail(msgId);
-            }
-            for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) {
-                spout.nextTuple();
-            }
-            verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+            reset(collectorMock);
             
-            //Fail the last tuple, which brings the number of nonretriable tuples back under the limit, and check that the spout polls again
-            spout.fail(firstTupleFromRetryBatch);
+            //Now also check that no more tuples are polled for, since both partitions are at their limits
             spout.nextTuple();
-            verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+
+            verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
new file mode 100644
index 0000000..09f7fc5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+import static org.hamcrest.CoreMatchers.is;
+
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+
+public class KafkaSpoutLogCompactionSupportTest {
+
+    private final long offsetCommitPeriodMs = 2_000;
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+    private KafkaSpoutConfig<String, String> spoutConfig;
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .build();
+        consumerMock = mock(KafkaConsumer.class);
+    }
+
+    @Test
+    public void testCommitSuccessWithOffsetVoids() {
+        //Verify that the commit logic can handle offset voids due to log compaction
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+            // Offsets emitted are 0,1,2,3,4,<void>,8,9
+            recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 5));
+            recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 8, 2));
+            records.put(partition, recordsForPartition);
+
+            when(consumerMock.poll(anyLong()))
+                    .thenReturn(new ConsumerRecords<>(records));
+
+            for (int i = 0; i < recordsForPartition.size(); i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+
+            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+                spout.ack(messageId);
+            }
+
+            // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            when(consumerMock.poll(anyLong()))
+                    .thenReturn(new ConsumerRecords<String, String>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
+            spout.nextTuple();
+
+            InOrder inOrder = inOrder(consumerMock);
+            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+            inOrder.verify(consumerMock).poll(anyLong());
+
+            //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at
+            Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+            assertTrue(commits.containsKey(partition));
+            assertEquals(10, commits.get(partition).offset());
+        }
+    }
+    
+    @Test
+    public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() {
+        /*
+          Verify that failed offsets will only retry if the corresponding message exists. 
+          When log compaction is enabled in Kafka it is possible that a tuple can fail, 
+          and then be impossible to retry because the message in Kafka has been deleted.
+          The spout needs to quietly ack such tuples to allow commits to progress past the deleted offset.
+         */
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo);
+            
+            List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2);
+            reset(collectorMock);
+            List<KafkaSpoutMessageId> secondPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 3, collectorMock, partitionTwo, 0, 1, 2);
+            reset(collectorMock);
+            
+            for(int i = 0; i < 3; i++) {
+                spout.fail(firstPartitionMsgIds.get(i));
+                spout.fail(secondPartitionMsgIds.get(i));
+            }
+            
+            Time.advanceTime(50);
+            
+            //The failed tuples are ready for retry. Make it appear like 0 and 1 on the first partition were compacted away.
+            //In this case the second partition acts as control to verify that we only skip past offsets that are no longer present.
+            Map<TopicPartition, int[]> retryOffsets = new HashMap<>();
+            retryOffsets.put(partition, new int[] {2});
+            retryOffsets.put(partitionTwo, new int[] {0, 1, 2});
+            int expectedEmits = 4; //2 on first partition, 0-2 on second partition
+            List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, retryOffsets);
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock).commitSync(commitCapture.capture());
+            Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+            assertThat(committed.keySet(), is(Collections.singleton(partition)));
+            assertThat("The first partition should have committed up to the first retriable tuple that is not missing", committed.get(partition).offset(), is(2L));
+            
+            for(KafkaSpoutMessageId msgId : retryMessageIds) {
+                spout.ack(msgId);
+            }
+            
+            //The spout should now commit all the offsets, since all offsets are either acked or were missing when retrying
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+
+            verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+            committed = commitCapture.getValue();
+            assertThat(committed, hasKey(partition));
+            assertThat(committed, hasKey(partitionTwo));
+            assertThat(committed.get(partition).offset(), is(3L));
+            assertThat(committed.get(partitionTwo).offset(), is(3L));
+        }
+    }
+    
+    @Test
+    public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples() {
+        //Demonstrate that the spout doesn't ack pending tuples when skipping compacted tuples. The pending tuples should be allowed to finish normally.
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            
+            List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2);
+            reset(collectorMock);
+            
+            spout.fail(firstPartitionMsgIds.get(0));            
+            spout.fail(firstPartitionMsgIds.get(2));
+            
+            Time.advanceTime(50);
+            
+            //The failed tuples are ready for retry. Make it appear like 0 and 1 were compacted away.
+            List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2);
+            for(KafkaSpoutMessageId msgId : retryMessageIds) {
+                spout.ack(msgId);
+            }
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock).commitSync(commitCapture.capture());
+            Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+            assertThat(committed.keySet(), is(Collections.singleton(partition)));
+            assertThat("The first partition should have committed the missing offset, but no further since the next tuple is pending",
+                committed.get(partition).offset(), is(1L));
+            
+            spout.ack(firstPartitionMsgIds.get(1));
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+            committed = commitCapture.getValue();
+            assertThat(committed.keySet(), is(Collections.singleton(partition)));
+            assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L));
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
new file mode 100644
index 0000000..082cc58
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutMessagingGuaranteeTest {
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+
+    @Before
+    public void setUp() {
+        consumerMock = mock(KafkaConsumer.class);
+    }
+
+    @Test
+    public void testAtMostOnceModeCommitsBeforeEmit() throws Exception {
+        //At-most-once mode must commit tuples before they are emitted to the topology to ensure that a spout crash won't cause replays.
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+
+        when(consumerMock.position(partition)).thenReturn(1L);
+
+        //The spout should have emitted the tuple, and must have committed it before emit
+        InOrder inOrder = inOrder(consumerMock, collectorMock);
+        inOrder.verify(consumerMock).poll(anyLong());
+        inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+        inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+
+        CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+        assertThat(committedOffsets.get(partition).offset(), is(0L));
+        assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
+    }
+
+    private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) {
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+        when(consumerMock.poll(anyLong()))
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()))))
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, spoutConfig.getMaxUncommittedOffsets() - 1, spoutConfig.getMaxUncommittedOffsets()))));
+
+        for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() * 2; i++) {
+            spout.nextTuple();
+        }
+
+        verify(consumerMock, times(2)).poll(anyLong());
+        verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+    }
+
+    @Test
+    public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception {
+        //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+    }
+
+    @Test
+    public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception {
+        //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+            .build();
+        doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+    }
+
+    private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) {
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+
+        ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+        assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+        spout.fail(msgIdCaptor.getValue());
+
+        reset(consumerMock);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 1, 1))));
+
+        spout.nextTuple();
+
+        //The consumer should not be seeking to retry the failed tuple, it should just be continuing from the current position
+        verify(consumerMock, never()).seek(eq(partition), anyLong());
+    }
+
+    @Test
+    public void testAtMostOnceModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setTupleTrackingEnforced(true)
+            .build();
+        doTestModeCannotReplayTuples(spoutConfig);
+    }
+
+    @Test
+    public void testNoGuaranteeModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in no guarantee mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+            .setTupleTrackingEnforced(true)
+            .build();
+        doTestModeCannotReplayTuples(spoutConfig);
+    }
+
+    @Test
+    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setTupleTrackingEnforced(true)
+            .build();
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+            spout.nextTuple();
+            reset(consumerMock);
+
+            ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+            assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+            spout.ack(msgIdCaptor.getValue());
+
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
+
+            spout.nextTuple();
+
+            verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>>() {
+                @Override
+                public boolean matches(Object arg) {
+                    Map<TopicPartition, OffsetAndMetadata> castArg = (Map<TopicPartition, OffsetAndMetadata>) arg;
+                    return !castArg.containsKey(partition);
+                }
+            }));
+        }
+    }
+
+    @Test
+    public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
+        //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+            .setTupleTrackingEnforced(true)
+            .build();
+
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+            spout.nextTuple();
+
+            when(consumerMock.position(partition)).thenReturn(1L);
+
+            ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+            assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+
+            spout.nextTuple();
+
+            verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
+
+            CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+            assertThat(committedOffsets.get(partition).offset(), is(1L));
+            assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
new file mode 100644
index 0000000..c2c46b5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutReactivationTest {
+
+    @Rule
+    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+    private final TopologyContext topologyContext = mock(TopologyContext.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+    private final long commitOffsetPeriodMs = 2_000;
+    private KafkaConsumer<String, String> consumerSpy;
+    private KafkaConsumer<String, String> postReactivationConsumerSpy;
+    private KafkaSpout<String, String> spout;
+    private final int maxPollRecords = 10;
+
+    @Before
+    public void setUp() {
+        KafkaSpoutConfig<String, String> spoutConfig =
+            SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+                KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+                    SingleTopicKafkaSpoutConfiguration.TOPIC))
+                .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+                .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+                .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
+                .build();
+        KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactoryDefault<>();
+        this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+        this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+        KafkaConsumerFactory<String, String> consumerFactoryMock = mock(KafkaConsumerFactory.class);
+        when(consumerFactoryMock.createConsumer(any(KafkaSpoutConfig.class)))
+            .thenReturn(consumerSpy)
+            .thenReturn(postReactivationConsumerSpy);
+        this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock);
+    }
+
+    private void prepareSpout(int messageCount) throws Exception {
+        SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
+    }
+
+    private KafkaSpoutMessageId emitOne() {
+        ArgumentCaptor<KafkaSpoutMessageId> messageId = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        spout.nextTuple();
+        verify(collector).emit(anyString(), anyList(), messageId.capture());
+        reset(collector);
+        return messageId.getValue();
+    }
+
+    @Test
+    public void testSpoutMustHandleReactivationGracefully() throws Exception {
+        try (Time.SimulatedTime time = new Time.SimulatedTime()) {
+            int messageCount = maxPollRecords * 2;
+            prepareSpout(messageCount);
+
+            //Emit and ack some tuples, ensure that some polled tuples remain cached in the spout by emitting less than maxPollRecords
+            int beforeReactivationEmits = maxPollRecords - 3;
+            for (int i = 0; i < beforeReactivationEmits - 1; i++) {
+                KafkaSpoutMessageId msgId = emitOne();
+                spout.ack(msgId);
+            }
+
+            KafkaSpoutMessageId ackAfterDeactivateMessageId = emitOne();
+            
+            //Cycle spout activation
+            spout.deactivate();
+            SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, beforeReactivationEmits - 1);
+            //Tuples may be acked/failed after the spout deactivates, so we have to be able to handle this too
+            spout.ack(ackAfterDeactivateMessageId);
+            spout.activate();
+
+            //Emit and ack the rest
+            for (int i = beforeReactivationEmits; i < messageCount; i++) {
+                KafkaSpoutMessageId msgId = emitOne();
+                spout.ack(msgId);
+            }
+
+            //Commit
+            Time.advanceTime(TIMER_DELAY_MS + commitOffsetPeriodMs);
+            spout.nextTuple();
+
+            //Verify that no more tuples are emitted and all tuples are committed
+            SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(postReactivationConsumerSpy, commitCapture, messageCount);
+
+            reset(collector);
+            spout.nextTuple();
+            verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 1033e83..29d2a22 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,12 +15,12 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -31,10 +31,12 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-
+import java.util.Set;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -52,12 +54,10 @@ import org.mockito.Captor;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
 public class KafkaSpoutRebalanceTest {
 
     @Captor
@@ -85,30 +85,24 @@ public class KafkaSpoutRebalanceTest {
     }
 
     //Returns messageIds in order of emission
-    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
-        //Setup spout with mock consumer so we can get at the rebalance listener
+    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) {
+        //Setup spout with mock consumer so we can get at the rebalance listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
 
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
         //Assign partitions to the spout
         ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
-        List<TopicPartition> assignedPartitions = new ArrayList<>();
+        Set<TopicPartition> assignedPartitions = new HashSet<>();
         assignedPartitions.add(partitionThatWillBeRevoked);
         assignedPartitions.add(assignedPartition);
         consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+        when(consumerMock.assignment()).thenReturn(assignedPartitions);
 
         //Make the consumer return a single message for each partition
-        Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
-        firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord<>(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
-        Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
-        secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
         when(consumerMock.poll(anyLong()))
-            .thenReturn(new ConsumerRecords(firstPartitionRecords))
-            .thenReturn(new ConsumerRecords(secondPartitionRecords))
-            .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionThatWillBeRevoked, 0, 1))))
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(assignedPartition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(assignedPartition, 0, 1))))
+            .thenReturn(new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>()));
 
         //Emit the messages
         spout.nextTuple();
@@ -122,6 +116,7 @@ public class KafkaSpoutRebalanceTest {
         //Now rebalance
         consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
         consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+        when(consumerMock.assignment()).thenReturn(Collections.singleton(assignedPartition));
 
         List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
         emittedMessageIds.add(messageIdForRevokedPartition.getValue());
@@ -133,7 +128,12 @@ public class KafkaSpoutRebalanceTest {
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+            ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
+            KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
                 .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
                 .build(), consumerFactoryMock);
             String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -141,7 +141,8 @@ public class KafkaSpoutRebalanceTest {
             TopicPartition assignedPartition = new TopicPartition(topic, 2);
 
             //Emit a message on each partition and revoke the first partition
-            List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+            List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+                spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
 
             //Ack both emitted tuples
             spout.ack(emittedMessageIds.get(0));
@@ -163,8 +164,13 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
         KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
             .setOffsetCommitPeriodMs(10)
             .setRetry(retryServiceMock)
             .build(), consumerFactoryMock);
@@ -177,7 +183,8 @@ public class KafkaSpoutRebalanceTest {
             .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
 
         //Emit a message on each partition and revoke the first partition
-        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+            spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
 
         //Check that only two message ids were generated
         verify(retryServiceMock, times(2)).getMessageId(Mockito.any(ConsumerRecord.class));
@@ -199,7 +206,11 @@ public class KafkaSpoutRebalanceTest {
          */
 
         ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+        Subscription subscriptionMock = mock(Subscription.class);
+        doNothing()
+            .when(subscriptionMock)
+            .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
+        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
             .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
             .build(), consumerFactoryMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -209,8 +220,6 @@ public class KafkaSpoutRebalanceTest {
         //Setup spout with mock consumer so we can get at the rebalance listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
-        
-        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
 
         //Assign partitions to the spout
         ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index dac4bff..569becf 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -15,103 +15,84 @@
  */
 package org.apache.storm.kafka.spout;
 
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
 
-public class KafkaSpoutRetryLimitTest {
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
 
+public class KafkaSpoutRetryLimitTest {
+    
     private final long offsetCommitPeriodMs = 2_000;
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
     private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpout<String, String> spout;
-    private KafkaSpoutConfig spoutConfig;
-
+    private KafkaSpoutConfig<String, String> spoutConfig;
+    
     public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
-            new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
-                    0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
-    private void setupSpoutWithNoRetry(Set<TopicPartition> assignedPartitions) {
-        spoutConfig = getKafkaSpoutConfigBuilder(-1)
-                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-                .setRetry(ZERO_RETRIES_RETRY_SERVICE)
-                .build();
-
+        new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+    
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+    
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .setRetry(ZERO_RETRIES_RETRY_SERVICE)
+            .build();
         consumerMock = mock(KafkaConsumer.class);
-        KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() {
-            @Override
-            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
-                return consumerMock;
-            }
-        };
-
-        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
-        spout.open(conf, contextMock, collectorMock);
-        spout.activate();
-
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
-        //Assign partitions to the spout
-        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
-        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
     }
-
+    
     @Test
     public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
         //Spout should ack failed messages after they hit the retry limit
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpoutWithNoRetry(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
-            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
             int lastOffset = 3;
-            for (int i = 0; i <= lastOffset; i++) {
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
-            }
-            records.put(partition, recordsForPartition);
-
+            int numRecords = lastOffset + 1;
+            records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, numRecords));
+            
             when(consumerMock.poll(anyLong()))
-                    .thenReturn(new ConsumerRecords(records));
-
-            for (int i = 0; i < recordsForPartition.size(); i++) {
+                .thenReturn(new ConsumerRecords<>(records));
+            
+            for (int i = 0; i < numRecords; i++) {
                 spout.nextTuple();
             }
-
+            
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-            verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
-
+            verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture());
+            
             for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
                 spout.fail(messageId);
             }
@@ -119,16 +100,15 @@ public class KafkaSpoutRetryLimitTest {
             // Advance time and then trigger call to kafka consumer commit
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
             spout.nextTuple();
-
-            ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class);
+            
             InOrder inOrder = inOrder(consumerMock);
-            inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
+            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
             inOrder.verify(consumerMock).poll(anyLong());
 
             //verify that offset 4 was committed for the given TopicPartition, since processing should resume at 4.
-            assertTrue(committedOffsets.getValue().containsKey(partition));
-            assertEquals(lastOffset + 1, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset());
+            assertTrue(commitCapture.getValue().containsKey(partition));
+            assertEquals(lastOffset + 1, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset());
         }
     }
-
-}
\ No newline at end of file
+    
+}