You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/07 21:05:25 UTC
[3/7] storm git commit: STORM-2936 Overwrite latest
storm-kafka-client 1.x-branch into 1.1.x-branch
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 447f8c4..dbba04b 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -15,9 +15,6 @@
*/
package org.apache.storm.kafka.spout;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
@@ -26,35 +23,36 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
-import java.util.HashSet;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
import org.mockito.InOrder;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.eq;
+
public class KafkaSpoutEmitTest {
private final long offsetCommitPeriodMs = 2_000;
@@ -63,50 +61,26 @@ public class KafkaSpoutEmitTest {
private final Map<String, Object> conf = new HashMap<>();
private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
private KafkaConsumer<String, String> consumerMock;
- private KafkaSpout<String, String> spout;
- private KafkaSpoutConfig spoutConfig;
+ private KafkaSpoutConfig<String, String> spoutConfig;
- private void setupSpout(Set<TopicPartition> assignedPartitions) {
- spoutConfig = getKafkaSpoutConfigBuilder(-1)
+ @Before
+ public void setUp() {
+ spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.build();
-
consumerMock = mock(KafkaConsumer.class);
- KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() {
- @Override
- public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
- return consumerMock;
- }
- };
-
- //Set up a spout listening to 1 topic partition
- spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
- spout.open(conf, contextMock, collectorMock);
- spout.activate();
-
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
- //Assign partitions to the spout
- ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
- consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
}
@Test
public void testNextTupleEmitsAtMostOneTuple() {
//The spout should emit at most one message per call to nextTuple
//This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
- List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
- }
- records.put(partition, recordsForPartition);
+ records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 10));
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
+ .thenReturn(new ConsumerRecords<>(records));
spout.nextTuple();
@@ -114,29 +88,26 @@ public class KafkaSpoutEmitTest {
}
@Test
- public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() {
+ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() throws IOException {
//The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded
-
+
//Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
- List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
- for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
- //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
- }
- records.put(partition, recordsForPartition);
+ int numRecords = spoutConfig.getMaxUncommittedOffsets();
+ //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+ records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, numRecords));
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
+ .thenReturn(new ConsumerRecords<>(records));
- for (int i = 0; i < recordsForPartition.size(); i++) {
+ for (int i = 0; i < numRecords; i++) {
spout.nextTuple();
}
ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+ verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture());
for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
spout.fail(messageId);
@@ -146,16 +117,16 @@ public class KafkaSpoutEmitTest {
Time.advanceTime(50);
//No backoff for test retry service, just check that messages will retry immediately
- for (int i = 0; i < recordsForPartition.size(); i++) {
+ for (int i = 0; i < numRecords; i++) {
spout.nextTuple();
}
ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), retryMessageIds.capture());
+ verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), retryMessageIds.capture());
//Verify that the poll started at the earliest retriable tuple offset
List<Long> failedOffsets = new ArrayList<>();
- for(KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+ for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
failedOffsets.add(msgId.offset());
}
InOrder inOrder = inOrder(consumerMock);
@@ -163,93 +134,80 @@ public class KafkaSpoutEmitTest {
inOrder.verify(consumerMock).poll(anyLong());
}
}
-
+
@Test
- public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples() {
- /*
- The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded.
- numUncommittedOffsets is equal to numNonRetriableEmittedTuples + numRetriableTuples.
- The spout will only emit if numUncommittedOffsets - numRetriableTuples < maxUncommittedOffsets (i.e. numNonRetriableEmittedTuples < maxUncommittedOffsets)
- This means that the latest offset a poll can start at for a retriable partition,
- counting from the last committed offset, is maxUncommittedOffsets,
- where there are maxUncommittedOffsets - 1 uncommitted tuples "to the left".
- If the retry poll starts at that offset, it at most emits the retried tuple plus maxPollRecords - 1 new tuples.
- The limit on uncommitted offsets for one partition is therefore maxUncommittedOffsets + maxPollRecords - 1.
-
- It is only necessary to test this for a single partition, because partitions can't contribute negatively to numNonRetriableEmittedTuples,
- so if the limit holds for one partition, it will also hold for each individual partition when multiple are involved.
-
- This makes the actual limit numPartitions * (maxUncommittedOffsets + maxPollRecords - 1)
- */
-
- //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying
+ public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
+ //This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit.
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
-
- Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPollRecords = new HashMap<>();
- List<ConsumerRecord<String, String>> firstPollRecordsForPartition = new ArrayList<>();
- for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
- //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
- firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
- }
- firstPollRecords.put(partition, firstPollRecordsForPartition);
-
- int maxPollRecords = 5;
- Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPollRecords = new HashMap<>();
- List<ConsumerRecord<String, String>> secondPollRecordsForPartition = new ArrayList<>();
- for(int i = 0; i < maxPollRecords; i++) {
- secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
- }
- secondPollRecords.put(partition, secondPollRecordsForPartition);
+ TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo);
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+ records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()));
+ records.put(partitionTwo, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionTwo, 0, spoutConfig.getMaxUncommittedOffsets() + 1));
+ int numMessages = spoutConfig.getMaxUncommittedOffsets()*2 + 1;
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(firstPollRecords))
- .thenReturn(new ConsumerRecords(secondPollRecords));
+ .thenReturn(new ConsumerRecords<>(records));
- for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) {
+ for (int i = 0; i < numMessages; i++) {
spout.nextTuple();
}
ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock, times(firstPollRecordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
-
- KafkaSpoutMessageId failedMessageId = messageIds.getAllValues().get(messageIds.getAllValues().size() - 1);
- spout.fail(failedMessageId);
-
- reset(collectorMock);
-
- //Now make the single failed tuple retriable
- Time.advanceTime(50);
- //The spout should allow another poll since there are now only maxUncommittedOffsets - 1 nonretriable tuples
- for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) {
- spout.nextTuple();
+ verify(collectorMock, times(numMessages)).emit(anyString(), anyList(), messageIds.capture());
+
+ //Now fail a tuple on partition one and verify that it is allowed to retry, because the failed tuple is below the maxUncommittedOffsets limit
+ KafkaSpoutMessageId failedMessageIdPartitionOne = null;
+ for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+ if (msgId.partition() == partition.partition()) {
+ failedMessageIdPartitionOne = msgId;
+ break;
+ }
+ }
+
+ spout.fail(failedMessageIdPartitionOne);
+
+ //Also fail the last tuple from partition two. Since the failed tuple is beyond the maxUncommittedOffsets limit, it should not be retried until earlier messages are acked.
+ KafkaSpoutMessageId failedMessageIdPartitionTwo = null;
+ for (KafkaSpoutMessageId msgId: messageIds.getAllValues()) {
+ if (msgId.partition() == partitionTwo.partition()) {
+ if (failedMessageIdPartitionTwo != null) {
+ if (msgId.offset() >= failedMessageIdPartitionTwo.offset()) {
+ failedMessageIdPartitionTwo = msgId;
+ }
+ } else {
+ failedMessageIdPartitionTwo = msgId;
+ }
+ }
}
- ArgumentCaptor<KafkaSpoutMessageId> retryBatchMessageIdsCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock, times(maxPollRecords)).emit(anyString(), anyList(), retryBatchMessageIdsCaptor.capture());
+ spout.fail(failedMessageIdPartitionTwo);
+
reset(collectorMock);
- //Check that the consumer started polling at the failed tuple offset
+ Time.advanceTime(50);
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, failedMessageIdPartitionOne.offset(), 1))));
+
+ spout.nextTuple();
+
+ verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+
InOrder inOrder = inOrder(consumerMock);
- inOrder.verify(consumerMock).seek(partition, failedMessageId.offset());
+ inOrder.verify(consumerMock).seek(partition, failedMessageIdPartitionOne.offset());
+ //Should not seek on the paused partition
+ inOrder.verify(consumerMock, never()).seek(eq(partitionTwo), anyLong());
+ inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo));
inOrder.verify(consumerMock).poll(anyLong());
+ inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo));
- //Now fail all except one of the last batch, and check that the spout won't reemit any tuples because there are more than maxUncommittedOffsets nonretriable tuples
- Time.advanceTime(50);
- List<KafkaSpoutMessageId> retryBatchMessageIds = retryBatchMessageIdsCaptor.getAllValues();
- KafkaSpoutMessageId firstTupleFromRetryBatch = retryBatchMessageIds.remove(0);
- for(KafkaSpoutMessageId msgId : retryBatchMessageIds) {
- spout.fail(msgId);
- }
- for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) {
- spout.nextTuple();
- }
- verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+ reset(collectorMock);
- //Fail the last tuple, which brings the number of nonretriable tuples back under the limit, and check that the spout polls again
- spout.fail(firstTupleFromRetryBatch);
+ //Now also check that no more tuples are polled for, since both partitions are at their limits
spout.nextTuple();
- verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+
+ verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
new file mode 100644
index 0000000..09f7fc5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+import static org.hamcrest.CoreMatchers.is;
+
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+
+public class KafkaSpoutLogCompactionSupportTest {
+
+ private final long offsetCommitPeriodMs = 2_000;
+ private final TopologyContext contextMock = mock(TopologyContext.class);
+ private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaSpoutConfig<String, String> spoutConfig;
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .build();
+ consumerMock = mock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testCommitSuccessWithOffsetVoids() {
+ //Verify that the commit logic can handle offset voids due to log compaction
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+ // Offsets emitted are 0,1,2,3,4,<void>,8,9
+ recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 5));
+ recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 8, 2));
+ records.put(partition, recordsForPartition);
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(records));
+
+ for (int i = 0; i < recordsForPartition.size(); i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+
+ for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+ spout.ack(messageId);
+ }
+
+ // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<String, String>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
+ spout.nextTuple();
+
+ InOrder inOrder = inOrder(consumerMock);
+ inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+ inOrder.verify(consumerMock).poll(anyLong());
+
+ //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at
+ Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+ assertTrue(commits.containsKey(partition));
+ assertEquals(10, commits.get(partition).offset());
+ }
+ }
+
+ @Test
+ public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() {
+ /*
+ Verify that failed offsets will only retry if the corresponding message exists.
+ When log compaction is enabled in Kafka it is possible that a tuple can fail,
+ and then be impossible to retry because the message in Kafka has been deleted.
+ The spout needs to quietly ack such tuples to allow commits to progress past the deleted offset.
+ */
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo);
+
+ List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+ .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2);
+ reset(collectorMock);
+ List<KafkaSpoutMessageId> secondPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+ .pollAndEmit(spout, consumerMock, 3, collectorMock, partitionTwo, 0, 1, 2);
+ reset(collectorMock);
+
+ for(int i = 0; i < 3; i++) {
+ spout.fail(firstPartitionMsgIds.get(i));
+ spout.fail(secondPartitionMsgIds.get(i));
+ }
+
+ Time.advanceTime(50);
+
+ //The failed tuples are ready for retry. Make it appear like 0 and 1 on the first partition were compacted away.
+ //In this case the second partition acts as control to verify that we only skip past offsets that are no longer present.
+ Map<TopicPartition, int[]> retryOffsets = new HashMap<>();
+ retryOffsets.put(partition, new int[] {2});
+ retryOffsets.put(partitionTwo, new int[] {0, 1, 2});
+ int expectedEmits = 4; //2 on first partition, 0-2 on second partition
+ List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, retryOffsets);
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ verify(consumerMock).commitSync(commitCapture.capture());
+ Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+ assertThat(committed.keySet(), is(Collections.singleton(partition)));
+ assertThat("The first partition should have committed up to the first retriable tuple that is not missing", committed.get(partition).offset(), is(2L));
+
+ for(KafkaSpoutMessageId msgId : retryMessageIds) {
+ spout.ack(msgId);
+ }
+
+ //The spout should now commit all the offsets, since all offsets are either acked or were missing when retrying
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+ committed = commitCapture.getValue();
+ assertThat(committed, hasKey(partition));
+ assertThat(committed, hasKey(partitionTwo));
+ assertThat(committed.get(partition).offset(), is(3L));
+ assertThat(committed.get(partitionTwo).offset(), is(3L));
+ }
+ }
+
+ @Test
+ public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples() {
+ //Demonstrate that the spout doesn't ack pending tuples when skipping compacted tuples. The pending tuples should be allowed to finish normally.
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+ .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2);
+ reset(collectorMock);
+
+ spout.fail(firstPartitionMsgIds.get(0));
+ spout.fail(firstPartitionMsgIds.get(2));
+
+ Time.advanceTime(50);
+
+ //The failed tuples are ready for retry. Make it appear like 0 and 1 were compacted away.
+ List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2);
+ for(KafkaSpoutMessageId msgId : retryMessageIds) {
+ spout.ack(msgId);
+ }
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ verify(consumerMock).commitSync(commitCapture.capture());
+ Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+ assertThat(committed.keySet(), is(Collections.singleton(partition)));
+ assertThat("The first partition should have committed the missing offset, but no further since the next tuple is pending",
+ committed.get(partition).offset(), is(1L));
+
+ spout.ack(firstPartitionMsgIds.get(1));
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+ committed = commitCapture.getValue();
+ assertThat(committed.keySet(), is(Collections.singleton(partition)));
+ assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L));
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
new file mode 100644
index 0000000..082cc58
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutMessagingGuaranteeTest {
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private final TopologyContext contextMock = mock(TopologyContext.class);
+ private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+ private KafkaConsumer<String, String> consumerMock;
+
+ @Before
+ public void setUp() {
+ consumerMock = mock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testAtMostOnceModeCommitsBeforeEmit() throws Exception {
+ //At-most-once mode must commit tuples before they are emitted to the topology to ensure that a spout crash won't cause replays.
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .build();
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+
+ when(consumerMock.position(partition)).thenReturn(1L);
+
+ //The spout should have emitted the tuple, and must have committed it before emit
+ InOrder inOrder = inOrder(consumerMock, collectorMock);
+ inOrder.verify(consumerMock).poll(anyLong());
+ inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+ inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+
+ CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+ assertThat(committedOffsets.get(partition).offset(), is(0L));
+ assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
+ }
+
+ private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()))))
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, spoutConfig.getMaxUncommittedOffsets() - 1, spoutConfig.getMaxUncommittedOffsets()))));
+
+ for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() * 2; i++) {
+ spout.nextTuple();
+ }
+
+ verify(consumerMock, times(2)).poll(anyLong());
+ verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+ }
+
+ @Test
+ public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception {
+ //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .build();
+ doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+ }
+
+ @Test
+ public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception {
+ //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+ .build();
+ doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+ }
+
+ private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+
+ ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+ assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+ spout.fail(msgIdCaptor.getValue());
+
+ reset(consumerMock);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 1, 1))));
+
+ spout.nextTuple();
+
+ //The consumer should not be seeking to retry the failed tuple, it should just be continuing from the current position
+ verify(consumerMock, never()).seek(eq(partition), anyLong());
+ }
+
+ @Test
+ public void testAtMostOnceModeCannotReplayTuples() throws Exception {
+ //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .setTupleTrackingEnforced(true)
+ .build();
+ doTestModeCannotReplayTuples(spoutConfig);
+ }
+
+ @Test
+ public void testNoGuaranteeModeCannotReplayTuples() throws Exception {
+ //When tuple tracking is enabled, the spout must not replay tuples in no guarantee mode
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+ .setTupleTrackingEnforced(true)
+ .build();
+ doTestModeCannotReplayTuples(spoutConfig);
+ }
+
+ @Test
+ public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+ //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .setTupleTrackingEnforced(true)
+ .build();
+ try (SimulatedTime time = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+ reset(consumerMock);
+
+ ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+ assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+ spout.ack(msgIdCaptor.getValue());
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
+
+ spout.nextTuple();
+
+ verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>>() {
+ @Override
+ public boolean matches(Object arg) {
+ Map<TopicPartition, OffsetAndMetadata> castArg = (Map<TopicPartition, OffsetAndMetadata>) arg;
+ return !castArg.containsKey(partition);
+ }
+ }));
+ }
+ }
+
+ @Test
+ public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
+ //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+ .setTupleTrackingEnforced(true)
+ .build();
+
+ try (SimulatedTime time = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+
+ when(consumerMock.position(partition)).thenReturn(1L);
+
+ ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+ assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+
+ spout.nextTuple();
+
+ verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
+
+ CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+ assertThat(committedOffsets.get(partition).offset(), is(1L));
+ assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
new file mode 100644
index 0000000..c2c46b5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutReactivationTest {
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private final TopologyContext topologyContext = mock(TopologyContext.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ private final long commitOffsetPeriodMs = 2_000;
+ private KafkaConsumer<String, String> consumerSpy;
+ private KafkaConsumer<String, String> postReactivationConsumerSpy;
+ private KafkaSpout<String, String> spout;
+ private final int maxPollRecords = 10;
+
+ @Before
+ public void setUp() {
+ KafkaSpoutConfig<String, String> spoutConfig =
+ SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+ KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+ SingleTopicKafkaSpoutConfiguration.TOPIC))
+ .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+ .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+ .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
+ .build();
+ KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactoryDefault<>();
+ this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+ this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+ KafkaConsumerFactory<String, String> consumerFactoryMock = mock(KafkaConsumerFactory.class);
+ when(consumerFactoryMock.createConsumer(any(KafkaSpoutConfig.class)))
+ .thenReturn(consumerSpy)
+ .thenReturn(postReactivationConsumerSpy);
+ this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock);
+ }
+
+ private void prepareSpout(int messageCount) throws Exception {
+ SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
+ SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
+ }
+
+ private KafkaSpoutMessageId emitOne() {
+ ArgumentCaptor<KafkaSpoutMessageId> messageId = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ spout.nextTuple();
+ verify(collector).emit(anyString(), anyList(), messageId.capture());
+ reset(collector);
+ return messageId.getValue();
+ }
+
+ @Test
+ public void testSpoutMustHandleReactivationGracefully() throws Exception {
+ try (Time.SimulatedTime time = new Time.SimulatedTime()) {
+ int messageCount = maxPollRecords * 2;
+ prepareSpout(messageCount);
+
+ //Emit and ack some tuples, ensure that some polled tuples remain cached in the spout by emitting less than maxPollRecords
+ int beforeReactivationEmits = maxPollRecords - 3;
+ for (int i = 0; i < beforeReactivationEmits - 1; i++) {
+ KafkaSpoutMessageId msgId = emitOne();
+ spout.ack(msgId);
+ }
+
+ KafkaSpoutMessageId ackAfterDeactivateMessageId = emitOne();
+
+ //Cycle spout activation
+ spout.deactivate();
+ SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, beforeReactivationEmits - 1);
+ //Tuples may be acked/failed after the spout deactivates, so we have to be able to handle this too
+ spout.ack(ackAfterDeactivateMessageId);
+ spout.activate();
+
+ //Emit and ack the rest
+ for (int i = beforeReactivationEmits; i < messageCount; i++) {
+ KafkaSpoutMessageId msgId = emitOne();
+ spout.ack(msgId);
+ }
+
+ //Commit
+ Time.advanceTime(TIMER_DELAY_MS + commitOffsetPeriodMs);
+ spout.nextTuple();
+
+ //Verify that no more tuples are emitted and all tuples are committed
+ SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(postReactivationConsumerSpy, commitCapture, messageCount);
+
+ reset(collector);
+ spout.nextTuple();
+ verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 1033e83..29d2a22 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,12 +15,12 @@
*/
package org.apache.storm.kafka.spout;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -31,10 +31,12 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-
+import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -52,12 +54,10 @@ import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
public class KafkaSpoutRebalanceTest {
@Captor
@@ -85,30 +85,24 @@ public class KafkaSpoutRebalanceTest {
}
//Returns messageIds in order of emission
- private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
- //Setup spout with mock consumer so we can get at the rebalance listener
+ private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) {
+ //Setup spout with mock consumer so we can get at the rebalance listener
spout.open(conf, contextMock, collectorMock);
spout.activate();
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
//Assign partitions to the spout
ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
- List<TopicPartition> assignedPartitions = new ArrayList<>();
+ Set<TopicPartition> assignedPartitions = new HashSet<>();
assignedPartitions.add(partitionThatWillBeRevoked);
assignedPartitions.add(assignedPartition);
consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+ when(consumerMock.assignment()).thenReturn(assignedPartitions);
//Make the consumer return a single message for each partition
- Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
- firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord<>(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
- Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
- secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(firstPartitionRecords))
- .thenReturn(new ConsumerRecords(secondPartitionRecords))
- .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionThatWillBeRevoked, 0, 1))))
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(assignedPartition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(assignedPartition, 0, 1))))
+ .thenReturn(new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>()));
//Emit the messages
spout.nextTuple();
@@ -122,6 +116,7 @@ public class KafkaSpoutRebalanceTest {
//Now rebalance
consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+ when(consumerMock.assignment()).thenReturn(Collections.singleton(assignedPartition));
List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
emittedMessageIds.add(messageIdForRevokedPartition.getValue());
@@ -133,7 +128,12 @@ public class KafkaSpoutRebalanceTest {
public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ Subscription subscriptionMock = mock(Subscription.class);
+ doNothing()
+ .when(subscriptionMock)
+ .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
+ KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.build(), consumerFactoryMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -141,7 +141,8 @@ public class KafkaSpoutRebalanceTest {
TopicPartition assignedPartition = new TopicPartition(topic, 2);
//Emit a message on each partition and revoke the first partition
- List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+ spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
//Ack both emitted tuples
spout.ack(emittedMessageIds.get(0));
@@ -163,8 +164,13 @@ public class KafkaSpoutRebalanceTest {
@Test
public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ Subscription subscriptionMock = mock(Subscription.class);
+ doNothing()
+ .when(subscriptionMock)
+ .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+ KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
.setOffsetCommitPeriodMs(10)
.setRetry(retryServiceMock)
.build(), consumerFactoryMock);
@@ -177,7 +183,8 @@ public class KafkaSpoutRebalanceTest {
.thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
//Emit a message on each partition and revoke the first partition
- List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+ spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
//Check that only two message ids were generated
verify(retryServiceMock, times(2)).getMessageId(Mockito.any(ConsumerRecord.class));
@@ -199,7 +206,11 @@ public class KafkaSpoutRebalanceTest {
*/
ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+ Subscription subscriptionMock = mock(Subscription.class);
+ doNothing()
+ .when(subscriptionMock)
+ .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
+ KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.build(), consumerFactoryMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -209,8 +220,6 @@ public class KafkaSpoutRebalanceTest {
//Setup spout with mock consumer so we can get at the rebalance listener
spout.open(conf, contextMock, collectorMock);
spout.activate();
-
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
//Assign partitions to the spout
ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index dac4bff..569becf 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -15,103 +15,84 @@
*/
package org.apache.storm.kafka.spout;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
-public class KafkaSpoutRetryLimitTest {
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+public class KafkaSpoutRetryLimitTest {
+
private final long offsetCommitPeriodMs = 2_000;
private final TopologyContext contextMock = mock(TopologyContext.class);
private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
private final Map<String, Object> conf = new HashMap<>();
private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
private KafkaConsumer<String, String> consumerMock;
- private KafkaSpout<String, String> spout;
- private KafkaSpoutConfig spoutConfig;
-
+ private KafkaSpoutConfig<String, String> spoutConfig;
+
public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
- new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
- 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
- private void setupSpoutWithNoRetry(Set<TopicPartition> assignedPartitions) {
- spoutConfig = getKafkaSpoutConfigBuilder(-1)
- .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
- .setRetry(ZERO_RETRIES_RETRY_SERVICE)
- .build();
-
+ new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+ 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .setRetry(ZERO_RETRIES_RETRY_SERVICE)
+ .build();
consumerMock = mock(KafkaConsumer.class);
- KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() {
- @Override
- public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
- return consumerMock;
- }
- };
-
- spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
- spout.open(conf, contextMock, collectorMock);
- spout.activate();
-
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
- //Assign partitions to the spout
- ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
- consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
}
-
+
@Test
public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
//Spout should ack failed messages after they hit the retry limit
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpoutWithNoRetry(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
- List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
int lastOffset = 3;
- for (int i = 0; i <= lastOffset; i++) {
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
- }
- records.put(partition, recordsForPartition);
-
+ int numRecords = lastOffset + 1;
+ records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, numRecords));
+
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
-
- for (int i = 0; i < recordsForPartition.size(); i++) {
+ .thenReturn(new ConsumerRecords<>(records));
+
+ for (int i = 0; i < numRecords; i++) {
spout.nextTuple();
}
-
+
ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
-
+ verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture());
+
for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
spout.fail(messageId);
}
@@ -119,16 +100,15 @@ public class KafkaSpoutRetryLimitTest {
// Advance time and then trigger call to kafka consumer commit
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
spout.nextTuple();
-
- ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class);
+
InOrder inOrder = inOrder(consumerMock);
- inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
+ inOrder.verify(consumerMock).commitSync(commitCapture.capture());
inOrder.verify(consumerMock).poll(anyLong());
//verify that offset 4 was committed for the given TopicPartition, since processing should resume at 4.
- assertTrue(committedOffsets.getValue().containsKey(partition));
- assertEquals(lastOffset + 1, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset());
+ assertTrue(commitCapture.getValue().containsKey(partition));
+ assertEquals(lastOffset + 1, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset());
}
}
-
-}
\ No newline at end of file
+
+}