You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/03 14:41:28 UTC
[10/40] storm git commit: STORM-2343: Fix new Kafka spout stopping
processing if more than maxUncommittedOffsets tuples fail at once
STORM-2343: Fix new Kafka spout stopping processing if more than maxUncommittedOffsets tuples fail at once
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fe16fd40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fe16fd40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fe16fd40
Branch: refs/heads/1.1.x-branch
Commit: fe16fd40c69d872e0a2cd5a5dd0bc39fbf72cd9d
Parents: 29c349d
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Authored: Sat Apr 8 08:26:42 2017 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 29 16:29:34 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 26 +-
.../storm/kafka/spout/KafkaSpoutConfig.java | 10 +-
.../KafkaSpoutRetryExponentialBackoff.java | 78 ++++--
.../kafka/spout/KafkaSpoutRetryService.java | 10 +-
.../storm/kafka/spout/KafkaSpoutEmitTest.java | 256 +++++++++++++++++++
.../kafka/spout/KafkaSpoutRebalanceTest.java | 11 +-
.../kafka/spout/MaxUncommittedOffsetTest.java | 245 ++++++++++++++++++
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 45 +++-
.../SingleTopicKafkaSpoutConfiguration.java | 19 +-
9 files changed, 633 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 207ba23..0bfcfea 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -79,7 +80,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient Map<TopicPartition, OffsetManager> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
- private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if it's AutoCommitMode
+ private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
private transient Timer refreshSubscriptionTimer; // Triggers when a subscription should be refreshed
private transient TopologyContext context;
@@ -250,9 +251,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private boolean poll() {
final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
- final boolean poll = !waitingToEmit()
- && (numUncommittedOffsets < maxUncommittedOffsets || consumerAutoCommitMode);
-
+ final int readyMessageCount = retryService.readyMessageCount();
+ final boolean poll = !waitingToEmit() &&
+ //Check that the number of uncommitted, nonretriable tuples is less than the maxUncommittedOffsets limit
+ //Accounting for retriable tuples this way still guarantees that the limit is followed on a per partition basis, and prevents locking up the spout when there are too many retriable tuples
+ (numUncommittedOffsets - readyMessageCount < maxUncommittedOffsets ||
+ consumerAutoCommitMode);
+
if (!poll) {
if (waitingToEmit()) {
LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
@@ -290,15 +295,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
private void doSeekRetriableTopicPartitions() {
- final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
+ final Map<TopicPartition, Long> retriableTopicPartitions = retryService.earliestRetriableOffsets();
- for (TopicPartition rtp : retriableTopicPartitions) {
- final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
- if (offsetAndMeta != null) {
- kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
- } else {
- kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1); // Seek to last committed offset
- }
+ for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : retriableTopicPartitions.entrySet()) {
+ //Seek directly to the earliest retriable message for each retriable topic partition
+ kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
}
}
@@ -318,7 +319,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
-
if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 920dca9..5f8071f 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -269,12 +269,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
/**
* The maximum number of records a poll will return.
- * Will only work with Kafka 0.10.0 and above.
*/
public Builder<K,V> setMaxPollRecords(int records) {
- //to avoid issues with 0.9 versions that technically still work
- // with this we do not use ConsumerConfig.MAX_POLL_RECORDS_CONFIG
- return setProp("max.poll.records", records);
+ return setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
}
//Security Related Configs
@@ -330,11 +327,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.offsetCommitPeriodMs = offsetCommitPeriodMs;
return this;
}
-
+
/**
* Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
* Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
- * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+ * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+ * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1.
* @param maxUncommittedOffsets max number of records that can be be pending commit
*/
public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 2c8d7e4..2584685 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -25,15 +25,18 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import org.apache.storm.utils.Time;
/**
* Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
- * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1) where failCount = 1, 2, 3, ...
* nextRetry = Min(nextRetry, currentTime + maxDelay)
*/
public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
@@ -54,7 +57,14 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
@Override
public int compare(RetrySchedule entry1, RetrySchedule entry2) {
- return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+ int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+
+ if(result == 0) {
+ //TreeSet uses compareTo instead of equals() for the Set contract
+ //Ensure that we can save two retry schedules with the same timestamp
+ result = entry1.hashCode() - entry2.hashCode();
+ }
+ return result;
}
}
@@ -62,13 +72,13 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
private final KafkaSpoutMessageId msgId;
private long nextRetryTimeNanos;
- public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
+ public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) {
this.msgId = msgId;
- this.nextRetryTimeNanos = nextRetryTime;
+ this.nextRetryTimeNanos = nextRetryTimeNanos;
LOG.debug("Created {}", this);
}
- public void setNextRetryTime() {
+ public void setNextRetryTimeNanos() {
nextRetryTimeNanos = nextTime(msgId);
LOG.debug("Updated {}", this);
}
@@ -81,7 +91,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
public String toString() {
return "RetrySchedule{" +
"msgId=" + msgId +
- ", nextRetryTime=" + nextRetryTimeNanos +
+ ", nextRetryTimeNanos=" + nextRetryTimeNanos +
'}';
}
@@ -96,19 +106,19 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
public static class TimeInterval implements Serializable {
private final long lengthNanos;
- private final long length;
private final TimeUnit timeUnit;
+ private final long length;
/**
* @param length length of the time interval in the units specified by {@link TimeUnit}
* @param timeUnit unit used to specify a time interval on which to specify a time unit
*/
public TimeInterval(long length, TimeUnit timeUnit) {
- this.length = length;
- this.timeUnit = timeUnit;
this.lengthNanos = timeUnit.toNanos(length);
+ this.timeUnit = timeUnit;
+ this.length = length;
}
-
+
public static TimeInterval seconds(long length) {
return new TimeInterval(length, TimeUnit.SECONDS);
}
@@ -116,19 +126,15 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
public static TimeInterval milliSeconds(long length) {
return new TimeInterval(length, TimeUnit.MILLISECONDS);
}
-
+
public static TimeInterval microSeconds(long length) {
return new TimeInterval(length, TimeUnit.MICROSECONDS);
}
-
+
public long lengthNanos() {
return lengthNanos;
}
-
- public long length() {
- return length;
- }
-
+
public TimeUnit timeUnit() {
return timeUnit;
}
@@ -165,26 +171,32 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
}
@Override
- public Set<TopicPartition> retriableTopicPartitions() {
- final Set<TopicPartition> tps = new HashSet<>();
- final long currentTimeNanos = System.nanoTime();
+ public Map<TopicPartition, Long> earliestRetriableOffsets() {
+ final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
+ final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
final KafkaSpoutMessageId msgId = retrySchedule.msgId;
- tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
+ final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
+ final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
+ if(currentLowestOffset != null) {
+ tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
+ } else {
+ tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
+ }
} else {
break; // Stop searching as soon as passed current time
}
}
- LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
- return tps;
+ LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
+ return tpToEarliestRetriableOffset;
}
@Override
public boolean isReady(KafkaSpoutMessageId msgId) {
boolean retry = false;
if (toRetryMsgs.contains(msgId)) {
- final long currentTimeNanos = System.nanoTime();
+ final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
if (retrySchedule.msgId.equals(msgId)) {
@@ -265,13 +277,27 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
return true;
}
}
+
+ @Override
+ public int readyMessageCount() {
+ int count = 0;
+ final long currentTimeNanos = Time.nanoTime();
+ for (RetrySchedule retrySchedule : retrySchedules) {
+ if (retrySchedule.retry(currentTimeNanos)) {
+ ++count;
+ } else {
+ break; //Stop counting when past current time
+ }
+ }
+ return count;
+ }
// if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
private long nextTime(KafkaSpoutMessageId msgId) {
- final long currentTimeNanos = System.nanoTime();
+ final long currentTimeNanos = Time.nanoTime();
final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
? currentTimeNanos + initialDelay.lengthNanos
- : (currentTimeNanos + delayPeriod.timeUnit.toNanos((long) Math.pow(delayPeriod.length, msgId.numFails() - 1)));
+ : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1));
return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index bf17a5a..f0230c3 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
import java.io.Serializable;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
/**
@@ -54,10 +55,11 @@ public interface KafkaSpoutRetryService extends Serializable {
boolean retainAll(Collection<TopicPartition> topicPartitions);
/**
- * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
- * for which a tuple has failed and has retry time less than current time
+ * @return The earliest retriable offset for each TopicPartition that has
+ * offsets ready to be retried, i.e. for which a tuple has failed
+ * and has retry time less than current time
*/
- Set<TopicPartition> retriableTopicPartitions();
+ Map<TopicPartition, Long> earliestRetriableOffsets();
/**
* Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried,
@@ -75,4 +77,6 @@ public interface KafkaSpoutRetryService extends Serializable {
* Returns false is this message is not scheduled for retrial
*/
boolean isScheduled(KafkaSpoutMessageId msgId);
+
+ int readyMessageCount();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
new file mode 100755
index 0000000..447f8c4
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.never;
+
+import java.util.HashSet;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.mockito.InOrder;
+
+public class KafkaSpoutEmitTest {
+
+ private final long offsetCommitPeriodMs = 2_000;
+ private final TopologyContext contextMock = mock(TopologyContext.class);
+ private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaSpout<String, String> spout;
+ private KafkaSpoutConfig spoutConfig;
+
+ private void setupSpout(Set<TopicPartition> assignedPartitions) {
+ spoutConfig = getKafkaSpoutConfigBuilder(-1)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .build();
+
+ consumerMock = mock(KafkaConsumer.class);
+ KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() {
+ @Override
+ public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+ return consumerMock;
+ }
+ };
+
+ //Set up a spout listening to 1 topic partition
+ spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+ spout.open(conf, contextMock, collectorMock);
+ spout.activate();
+
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+ //Assign partitions to the spout
+ ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+ consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+ }
+
+ @Test
+ public void testNextTupleEmitsAtMostOneTuple() {
+ //The spout should emit at most one message per call to nextTuple
+ //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
+ setupSpout(Collections.singleton(partition));
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ }
+ records.put(partition, recordsForPartition);
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(records));
+
+ spout.nextTuple();
+
+ verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+ }
+
+ @Test
+ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() {
+ //The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded
+
+ //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ setupSpout(Collections.singleton(partition));
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+ for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
+ //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+ recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ }
+ records.put(partition, recordsForPartition);
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(records));
+
+ for (int i = 0; i < recordsForPartition.size(); i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+
+ for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+ spout.fail(messageId);
+ }
+
+ reset(collectorMock);
+
+ Time.advanceTime(50);
+ //No backoff for test retry service, just check that messages will retry immediately
+ for (int i = 0; i < recordsForPartition.size(); i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), retryMessageIds.capture());
+
+ //Verify that the poll started at the earliest retriable tuple offset
+ List<Long> failedOffsets = new ArrayList<>();
+ for(KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+ failedOffsets.add(msgId.offset());
+ }
+ InOrder inOrder = inOrder(consumerMock);
+ inOrder.verify(consumerMock).seek(partition, failedOffsets.get(0));
+ inOrder.verify(consumerMock).poll(anyLong());
+ }
+ }
+
+ @Test
+ public void testNextTupleEmitsAtMostMaxUncommittedOffsetsPlusMaxPollRecordsWhenRetryingTuples() {
+ /*
+ The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded.
+ numUncommittedOffsets is equal to numNonRetriableEmittedTuples + numRetriableTuples.
+ The spout will only emit if numUncommittedOffsets - numRetriableTuples < maxUncommittedOffsets (i.e. numNonRetriableEmittedTuples < maxUncommittedOffsets)
+ This means that the latest offset a poll can start at for a retriable partition,
+ counting from the last committed offset, is maxUncommittedOffsets,
+ where there are maxUncommittedOffsets - 1 uncommitted tuples "to the left".
+ If the retry poll starts at that offset, it at most emits the retried tuple plus maxPollRecords - 1 new tuples.
+ The limit on uncommitted offsets for one partition is therefore maxUncommittedOffsets + maxPollRecords - 1.
+
+ It is only necessary to test this for a single partition, because partitions can't contribute negatively to numNonRetriableEmittedTuples,
+ so if the limit holds for one partition, it will also hold for each individual partition when multiple are involved.
+
+ This makes the actual limit numPartitions * (maxUncommittedOffsets + maxPollRecords - 1)
+ */
+
+ //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ setupSpout(Collections.singleton(partition));
+
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPollRecords = new HashMap<>();
+ List<ConsumerRecord<String, String>> firstPollRecordsForPartition = new ArrayList<>();
+ for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
+ //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+ firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ }
+ firstPollRecords.put(partition, firstPollRecordsForPartition);
+
+ int maxPollRecords = 5;
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPollRecords = new HashMap<>();
+ List<ConsumerRecord<String, String>> secondPollRecordsForPartition = new ArrayList<>();
+ for(int i = 0; i < maxPollRecords; i++) {
+ secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
+ }
+ secondPollRecords.put(partition, secondPollRecordsForPartition);
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(firstPollRecords))
+ .thenReturn(new ConsumerRecords(secondPollRecords));
+
+ for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(firstPollRecordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+
+ KafkaSpoutMessageId failedMessageId = messageIds.getAllValues().get(messageIds.getAllValues().size() - 1);
+ spout.fail(failedMessageId);
+
+ reset(collectorMock);
+
+ //Now make the single failed tuple retriable
+ Time.advanceTime(50);
+ //The spout should allow another poll since there are now only maxUncommittedOffsets - 1 nonretriable tuples
+ for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> retryBatchMessageIdsCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(maxPollRecords)).emit(anyString(), anyList(), retryBatchMessageIdsCaptor.capture());
+ reset(collectorMock);
+
+ //Check that the consumer started polling at the failed tuple offset
+ InOrder inOrder = inOrder(consumerMock);
+ inOrder.verify(consumerMock).seek(partition, failedMessageId.offset());
+ inOrder.verify(consumerMock).poll(anyLong());
+
+ //Now fail all except one of the last batch, and check that the spout won't reemit any tuples because there are more than maxUncommittedOffsets nonretriable tuples
+ Time.advanceTime(50);
+ List<KafkaSpoutMessageId> retryBatchMessageIds = retryBatchMessageIdsCaptor.getAllValues();
+ KafkaSpoutMessageId firstTupleFromRetryBatch = retryBatchMessageIds.remove(0);
+ for(KafkaSpoutMessageId msgId : retryBatchMessageIds) {
+ spout.fail(msgId);
+ }
+ for (int i = 0; i < firstPollRecordsForPartition.size() + maxPollRecords; i++) {
+ spout.nextTuple();
+ }
+ verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+
+ //Fail the last tuple, which brings the number of nonretriable tuples back under the limit, and check that the spout polls again
+ spout.fail(firstTupleFromRetryBatch);
+ spout.nextTuple();
+ verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 6a0a63e..81e3807 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,7 +15,7 @@
*/
package org.apache.storm.kafka.spout;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.Assert.assertThat;
@@ -126,7 +126,9 @@ public class KafkaSpoutRebalanceTest {
public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, this.offsetCommitPeriodMs), consumerFactoryMock);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .build(), consumerFactoryMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);
@@ -155,7 +157,10 @@ public class KafkaSpoutRebalanceTest {
public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactoryMock);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+ .setOffsetCommitPeriodMs(10)
+ .setRetry(retryServiceMock)
+ .build(), consumerFactoryMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);
http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
new file mode 100755
index 0000000..a7ad4c2
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.everyItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockitoAnnotations;
+
+public class MaxUncommittedOffsetTest {
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+ private final TopologyContext topologyContext = mock(TopologyContext.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ private final long commitOffsetPeriodMs = 2_000;
+ private final int numMessages = 100;
+ private final int maxUncommittedOffsets = 10;
+ private final int maxPollRecords = 5;
+ private final int initialRetryDelaySecs = 60;
+ private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+ .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+ .setMaxPollRecords(maxPollRecords)
+ .setMaxUncommittedOffsets(maxUncommittedOffsets)
+ .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+ 1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
+ .build();
+ private KafkaSpout<String, String> spout;
+
+ @Before
+ public void setUp() {
+ //This is because the tests are checking that a hard cap of maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets exists
+ //so Kafka must be able to return more messages than that in order for the tests to be meaningful
+ assertThat("Current tests require numMessages >= 2*maxUncommittedOffsets", numMessages, greaterThanOrEqualTo(maxUncommittedOffsets * 2));
+ //This is to verify that a low maxPollRecords does not interfere with reemitting failed tuples
+ //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets.
+ assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets));
+ MockitoAnnotations.initMocks(this);
+ this.spout = new KafkaSpout<>(spoutConfig);
+ }
+
+ private void populateTopicData(String topicName, int msgCount) throws Exception {
+ kafkaUnitRule.getKafkaUnit().createTopic(topicName);
+
+ for (int i = 0; i < msgCount; i++) {
+ ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
+ topicName, Integer.toString(i),
+ Integer.toString(i));
+
+ kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
+ }
+ }
+
+ private void initializeSpout(int msgCount) throws Exception {
+ populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+ spout.open(conf, topologyContext, collector);
+ spout.activate();
+ }
+
+ private ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) throws Exception {
+ assertThat("The message count is less than maxUncommittedOffsets. This test is not meaningful with this configuration.", messageCount, greaterThanOrEqualTo(maxUncommittedOffsets));
+ //The spout must respect maxUncommittedOffsets when requesting/emitting tuples
+ initializeSpout(messageCount);
+
+ //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ for (int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ };
+ verify(collector, times(maxUncommittedOffsets)).emit(
+ anyString(),
+ anyList(),
+ messageIds.capture());
+ return messageIds;
+ }
+
+ @Test
+ public void testNextTupleCanEmitMoreMessagesWhenDroppingBelowMaxUncommittedOffsetsDueToCommit() throws Exception {
+ //The spout must respect maxUncommittedOffsets after committing a set of records
+ try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+ //First check that maxUncommittedOffsets is respected when emitting from scratch
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+ reset(collector);
+
+ //Ack all emitted messages and commit them
+ for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+ spout.ack(messageId);
+ }
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ spout.nextTuple();
+
+ //Now check that the spout will emit another maxUncommittedOffsets messages
+ for (int i = 0; i < numMessages; i++) {
+ spout.nextTuple();
+ }
+ verify(collector, times(maxUncommittedOffsets)).emit(
+ anyString(),
+ anyList(),
+ anyObject());
+ }
+ }
+
+ @Test
+ public void testNextTupleWillRespectMaxUncommittedOffsetsWhenThereAreAckedUncommittedTuples() throws Exception {
+ //The spout must respect maxUncommittedOffsets even if some tuples have been acked but not committed
+ try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+ //First check that maxUncommittedOffsets is respected when emitting from scratch
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+ reset(collector);
+
+ //Fail all emitted messages except the last one. Try to commit.
+ List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
+ for (int i = 0; i < messageIdList.size() - 1; i++) {
+ spout.fail(messageIdList.get(i));
+ }
+ spout.ack(messageIdList.get(messageIdList.size() - 1));
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ spout.nextTuple();
+
+ //Now check that the spout will not emit anything else since nothing has been committed
+ for (int i = 0; i < numMessages; i++) {
+ spout.nextTuple();
+ }
+
+ verify(collector, times(0)).emit(
+ anyString(),
+ anyList(),
+ anyObject());
+ }
+ }
+
+ private void failAllExceptTheFirstMessageThenCommit(ArgumentCaptor<KafkaSpoutMessageId> messageIds) {
+ //Fail all emitted messages except the first. Commit the first.
+ List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
+ for (int i = 1; i < messageIdList.size(); i++) {
+ spout.fail(messageIdList.get(i));
+ }
+ spout.ack(messageIdList.get(0));
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ spout.nextTuple();
+ }
+
+ @Test
+ public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollRecordsMessages() throws Exception {
+ //The upper bound on uncommitted offsets should be maxUncommittedOffsets + maxPollRecords - 1
+ //This is reachable by emitting maxUncommittedOffsets messages, acking the first message, then polling.
+ try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+ //First check that maxUncommittedOffsets is respected when emitting from scratch
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+ reset(collector);
+
+ failAllExceptTheFirstMessageThenCommit(messageIds);
+
+ //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed
+ //The spout should now emit another maxPollRecords messages
+ //This is allowed because the acked message brings the numUncommittedOffsets below the cap
+ for (int i = 0; i < maxUncommittedOffsets; i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collector, times(maxPollRecords)).emit(
+ anyString(),
+ anyList(),
+ secondRunMessageIds.capture());
+ reset(collector);
+
+ List<Long> firstRunOffsets = new ArrayList<>();
+ for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+ firstRunOffsets.add(msgId.offset());
+ }
+ List<Long> secondRunOffsets = new ArrayList<>();
+ for (KafkaSpoutMessageId msgId : secondRunMessageIds.getAllValues()) {
+ secondRunOffsets.add(msgId.offset());
+ }
+ assertThat("Expected the newly emitted messages to have no overlap with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false));
+
+ //Offset 0 is acked, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
+ //There are now maxUncommittedOffsets-1 + maxPollRecords records emitted past the last committed offset
+ //Advance time so the failed tuples become ready for retry, and check that the spout will emit retriable tuples as long as numNonRetriableEmittedTuples < maxUncommittedOffsets
+
+ int numNonRetriableEmittedTuples = maxPollRecords; //The other tuples were failed and are becoming retriable
+ int allowedPolls = (int)Math.ceil((maxUncommittedOffsets - numNonRetriableEmittedTuples)/(double)maxPollRecords);
+ Time.advanceTimeSecs(initialRetryDelaySecs);
+ for (int i = 0; i < numMessages; i++) {
+ spout.nextTuple();
+ }
+ ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collector, times(allowedPolls*maxPollRecords)).emit(
+ anyString(),
+ anyList(),
+ thirdRunMessageIds.capture());
+ reset(collector);
+
+ List<Long> thirdRunOffsets = new ArrayList<>();
+ for (KafkaSpoutMessageId msgId : thirdRunMessageIds.getAllValues()) {
+ thirdRunOffsets.add(msgId.offset());
+ }
+
+ assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch", thirdRunOffsets, everyItem(isIn(firstRunOffsets)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 1121642..884709d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,7 +17,8 @@
*/
package org.apache.storm.kafka.spout;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -37,6 +38,7 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -46,6 +48,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
@@ -76,7 +81,9 @@ public class SingleTopicKafkaSpoutTest {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaUnit().getKafkaPort(), commitOffsetPeriodMs);
+ KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+ .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+ .build();
this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
this.consumerFactory = new KafkaConsumerFactory<String, String>() {
@Override
@@ -275,4 +282,38 @@ public class SingleTopicKafkaSpoutTest {
verifyAllMessagesCommitted(messageCount);
}
}
+
+ @Test
+ public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception {
+ //The spout must reemit retriable tuples, even if they fail out of order.
+ //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries.
+ int messageCount = 10;
+ initializeSpout(messageCount);
+
+ //play all tuples
+ for (int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ }
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIds.capture());
+ reset(collector);
+ //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
+ List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues();
+ spout.fail(capturedMessageIds.get(5));
+ spout.fail(capturedMessageIds.get(3));
+ spout.nextTuple();
+ spout.fail(capturedMessageIds.get(2));
+
+ //Check that the spout will reemit all 3 failed tuples and no other tuples
+ ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ for (int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ }
+ verify(collector, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture());
+ Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>();
+ expectedReemitIds.add(capturedMessageIds.get(5));
+ expectedReemitIds.add(capturedMessageIds.get(3));
+ expectedReemitIds.add(capturedMessageIds.get(2));
+ assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fe16fd40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 99bd3de..d5c052b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -46,19 +46,11 @@ public class SingleTopicKafkaSpoutConfiguration {
public static StormTopology getTopologyKafkaSpout(int port) {
final TopologyBuilder tp = new TopologyBuilder();
- tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(port)), 1);
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1);
tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
return tp.createTopology();
}
- static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port) {
- return getKafkaSpoutConfig(port, 10_000);
- }
-
- static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs) {
- return getKafkaSpoutConfig(port, offsetCommitPeriodMs, getRetryService());
- }
-
private static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
@Override
public List<Object> apply(ConsumerRecord<String, String> r) {
@@ -66,18 +58,17 @@ public class SingleTopicKafkaSpoutConfiguration {
}
};
- static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+ public static KafkaSpoutConfig.Builder<String,String> getKafkaSpoutConfigBuilder(int port) {
return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
.setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
new Fields("topic", "key", "value"), STREAM)
.setGroupId("kafkaSpoutTestGroup")
.setMaxPollRecords(5)
- .setRetry(retryService)
- .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .setRetry(getRetryService())
+ .setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
.setMaxUncommittedOffsets(250)
- .setPollTimeoutMs(1000)
- .build();
+ .setPollTimeoutMs(1000);
}
protected static KafkaSpoutRetryService getRetryService() {