You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/07 21:05:24 UTC
[2/7] storm git commit: STORM-2936 Overwrite latest
storm-kafka-client 1.x-branch into 1.1.x-branch
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
new file mode 100644
index 0000000..0bf9219
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.utils.Time;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertEquals;
+
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.hamcrest.Matchers;
+
+public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
+ private final int maxPollRecords = 10;
+ private final int maxRetries = 3;
+
+ public KafkaSpoutSingleTopicTest() {
+ super(2_000);
+ }
+
+ @Override
+ KafkaSpoutConfig<String, String> createSpoutConfig() {
+ return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+ KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+ Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
+ .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+ .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+ maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
+ .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
+ .build();
+ }
+
+ @Test
+ public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception {
+ final int messageCount = maxPollRecords * 2;
+ prepareSpout(messageCount);
+
+ //Emit all messages and fail the first one while acking the rest
+ for (int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ }
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture());
+ List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues();
+ for (int i = 1; i < messageIds.size(); i++) {
+ spout.ack(messageIds.get(i));
+ }
+ KafkaSpoutMessageId failedTuple = messageIds.get(0);
+ spout.fail(failedTuple);
+
+ //Advance the time and replay the failed tuple.
+ reset(collectorMock);
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyString(), anyList(), failedIdReplayCaptor.capture());
+
+ assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple));
+
+ /* Ack the tuple, and commit.
+ * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll.
+ */
+ reset(collectorMock);
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
+ spout.ack(failedIdReplayCaptor.getValue());
+ spout.nextTuple();
+ verify(consumerSpy).commitSync(commitCapture.capture());
+
+ Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue();
+ TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+ assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp));
+ assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount));
+
+ /* Verify that the following acked (now committed) tuples are not emitted again
+ * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened,
+ * this verifies that the spout keeps the consumer position ahead of the committed offset when committing
+ */
+ //Just do a few polls to check that nothing more is emitted
+ for(int i = 0; i < 3; i++) {
+ spout.nextTuple();
+ }
+ verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+ }
+
+ @Test
+ public void testShouldContinueWithSlowDoubleAcks() throws Exception {
+ final int messageCount = 20;
+ prepareSpout(messageCount);
+
+ //play 1st tuple
+ ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collectorMock).emit(anyString(), anyList(), messageIdToDoubleAck.capture());
+ spout.ack(messageIdToDoubleAck.getValue());
+
+ //Emit some more messages
+ for(int i = 0; i < messageCount / 2; i++) {
+ spout.nextTuple();
+ }
+
+ spout.ack(messageIdToDoubleAck.getValue());
+
+ //Emit any remaining messages
+ for(int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ }
+
+ //Verify that all messages are emitted, ack all the messages
+ ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class);
+ verify(collectorMock, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ anyList(),
+ messageIds.capture());
+ for(Object id : messageIds.getAllValues()) {
+ spout.ack(id);
+ }
+
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Commit offsets
+ spout.nextTuple();
+
+ verifyAllMessagesCommitted(messageCount);
+ }
+
+ @Test
+ public void testShouldEmitAllMessages() throws Exception {
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ //Emit all messages and check that they are emitted. Ack the messages too
+ for(int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
+ verify(collectorMock).emit(
+ eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
+ Integer.toString(i),
+ Integer.toString(i))),
+ messageId.capture());
+ spout.ack(messageId.getValue());
+ reset(collectorMock);
+ }
+
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Commit offsets
+ spout.nextTuple();
+
+ verifyAllMessagesCommitted(messageCount);
+ }
+
+ @Test
+ public void testShouldReplayInOrderFailedMessages() throws Exception {
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ //play and ack 1 tuple
+ ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collectorMock).emit(anyString(), anyList(), messageIdAcked.capture());
+ spout.ack(messageIdAcked.getValue());
+ reset(collectorMock);
+
+ //play and fail 1 tuple
+ ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collectorMock).emit(anyString(), anyList(), messageIdFailed.capture());
+ spout.fail(messageIdFailed.getValue());
+ reset(collectorMock);
+
+ //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
+ for(int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
+ //All messages except the first acked message should have been emitted
+ verify(collectorMock, times(messageCount - 1)).emit(
+ eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ anyList(),
+ remainingMessageIds.capture());
+ for(Object id : remainingMessageIds.getAllValues()) {
+ spout.ack(id);
+ }
+
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Commit offsets
+ spout.nextTuple();
+
+ verifyAllMessagesCommitted(messageCount);
+ }
+
+ @Test
+ public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception {
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ //play 1st tuple
+ ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collectorMock).emit(anyString(), anyList(), messageIdToFail.capture());
+ reset(collectorMock);
+
+ //play 2nd tuple
+ ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
+ spout.nextTuple();
+ verify(collectorMock).emit(anyString(), anyList(), messageIdToAck.capture());
+ reset(collectorMock);
+
+ //ack 2nd tuple
+ spout.ack(messageIdToAck.getValue());
+ //fail 1st tuple
+ spout.fail(messageIdToFail.getValue());
+
+ //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
+ for(int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
+ //All messages except the first acked message should have been emitted
+ verify(collectorMock, times(messageCount - 1)).emit(
+ eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ anyList(),
+ remainingIds.capture());
+ for(Object id : remainingIds.getAllValues()) {
+ spout.ack(id);
+ }
+
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Commit offsets
+ spout.nextTuple();
+
+ verifyAllMessagesCommitted(messageCount);
+ }
+
+ @Test
+ public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception {
+ //The spout must reemit retriable tuples, even if they fail out of order.
+ //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries.
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ //play all tuples
+ for (int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ }
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(messageCount)).emit(anyString(), anyList(), messageIds.capture());
+ reset(collectorMock);
+ //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
+ List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues();
+ spout.fail(capturedMessageIds.get(5));
+ spout.fail(capturedMessageIds.get(3));
+ spout.nextTuple();
+ spout.fail(capturedMessageIds.get(2));
+
+ //Check that the spout will reemit all 3 failed tuples and no other tuples
+ ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ for (int i = 0; i < messageCount; i++) {
+ spout.nextTuple();
+ }
+ verify(collectorMock, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture());
+ Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>();
+ expectedReemitIds.add(capturedMessageIds.get(5));
+ expectedReemitIds.add(capturedMessageIds.get(3));
+ expectedReemitIds.add(capturedMessageIds.get(2));
+ assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
+ }
+
+ @Test
+ public void testShouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
+ //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
+ final int messageCount = 1;
+ prepareSpout(messageCount);
+
+ //Emit and fail the same tuple until we've reached retry limit
+ for (int i = 0; i <= maxRetries; i++) {
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ spout.nextTuple();
+ verify(collectorMock).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture());
+ KafkaSpoutMessageId msgId = messageIdFailed.getValue();
+ spout.fail(msgId);
+ assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1));
+ reset(collectorMock);
+ }
+
+ //Verify that the tuple is not emitted again
+ spout.nextTuple();
+ verify(collectorMock, never()).emit(anyString(), anyListOf(Object.class), anyObject());
+ }
+
+ @Test
+ public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception {
+ SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
+
+ //Nothing is assigned yet, should emit nothing
+ spout.nextTuple();
+ verify(collectorMock, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+
+ SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+ Time.advanceTime(KafkaSpoutConfig.DEFAULT_PARTITION_REFRESH_PERIOD_MS + KafkaSpout.TIMER_DELAY_MS);
+
+ //The new partition should be discovered and the message should be emitted
+ spout.nextTuple();
+ verify(collectorMock).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+ }
+
+ @Test
+ public void testOffsetMetrics() throws Exception {
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+ // the offset of the last available message + 1.
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue());
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+ //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+
+ //Emit all messages and check that they are emitted. Ack the messages too
+ for (int i = 0; i < messageCount; i++) {
+ nextTuple_verifyEmitted_ack_resetCollector(i);
+ }
+
+ commitAndVerifyAllMessagesCommitted(messageCount);
+
+ offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+ //latest offset
+ assertEquals(9, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+ // offset where processing will resume upon spout restart
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
new file mode 100644
index 0000000..a860cef
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.junit.Test;
+
+import java.util.regex.Pattern;
+
+import static org.mockito.Mockito.when;
+
+public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAbstractTest {
+
+ public KafkaSpoutTopologyDeployActivateDeactivateTest() {
+ super(2_000);
+ }
+
+ @Override
+ KafkaSpoutConfig<String, String> createSpoutConfig() {
+ return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+ KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+ Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
+ .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+ .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+ .build();
+ }
+
+ @Test
+ public void test_FirstPollStrategy_Earliest_NotEnforced_OnTopologyActivateDeactivate() throws Exception {
+ final int messageCount = 2;
+ prepareSpout(messageCount);
+
+ nextTuple_verifyEmitted_ack_resetCollector(0);
+
+ //Commits offsets during deactivation
+ spout.deactivate();
+
+ verifyAllMessagesCommitted(1);
+
+ consumerSpy = createConsumerSpy();
+
+ spout.activate();
+
+ nextTuple_verifyEmitted_ack_resetCollector(1);
+
+ commitAndVerifyAllMessagesCommitted(messageCount);
+ }
+
+ @Test
+ public void test_FirstPollStrategy_Earliest_NotEnforced_OnPartitionReassignment() throws Exception {
+ when(topologyContext.getStormId()).thenReturn("topology-1");
+
+ final int messageCount = 2;
+ prepareSpout(messageCount);
+
+ nextTuple_verifyEmitted_ack_resetCollector(0);
+
+ //Commits offsets during deactivation
+ spout.deactivate();
+
+ verifyAllMessagesCommitted(1);
+
+ // Restart topology with the same topology id, which mimics the behavior of partition reassignment
+ setUp();
+ // Initialize spout using the same populated data (i.e same kafkaUnitRule)
+ SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
+
+ nextTuple_verifyEmitted_ack_resetCollector(1);
+
+ commitAndVerifyAllMessagesCommitted(messageCount);
+ }
+
+ @Test
+ public void test_FirstPollStrategy_Earliest_Enforced_OnlyOnTopologyDeployment() throws Exception {
+ when(topologyContext.getStormId()).thenReturn("topology-1");
+
+ final int messageCount = 2;
+ prepareSpout(messageCount);
+
+ nextTuple_verifyEmitted_ack_resetCollector(0);
+
+ //Commits offsets during deactivation
+ spout.deactivate();
+
+ verifyAllMessagesCommitted(1);
+
+ // Restart topology with a different topology id
+ setUp();
+ when(topologyContext.getStormId()).thenReturn("topology-2");
+ // Initialize spout using the same populated data (i.e same kafkaUnitRule)
+ SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
+
+ //Emit all messages and check that they are emitted. Ack the messages too
+ for (int i = 0; i < messageCount; i++) {
+ nextTuple_verifyEmitted_ack_resetCollector(i);
+ }
+
+ commitAndVerifyAllMessagesCommitted(messageCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index a7ad4c2..b90a49d 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -15,7 +15,7 @@
*/
package org.apache.storm.kafka.spout;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.either;
import static org.hamcrest.CoreMatchers.everyItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -34,7 +34,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.kafka.clients.producer.ProducerRecord;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.KafkaUnitRule;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.spout.SpoutOutputCollector;
@@ -46,6 +47,10 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.MockitoAnnotations;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.never;
+
public class MaxUncommittedOffsetTest {
@Rule
@@ -59,15 +64,17 @@ public class MaxUncommittedOffsetTest {
private final int maxUncommittedOffsets = 10;
private final int maxPollRecords = 5;
private final int initialRetryDelaySecs = 60;
- private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+ private final KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
- .setMaxPollRecords(maxPollRecords)
+ .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
.setMaxUncommittedOffsets(maxUncommittedOffsets)
.setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
.build();
private KafkaSpout<String, String> spout;
+
+
@Before
public void setUp() {
//This is because the tests are checking that a hard cap of maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets exists
@@ -77,37 +84,24 @@ public class MaxUncommittedOffsetTest {
//The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets.
assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets));
MockitoAnnotations.initMocks(this);
- this.spout = new KafkaSpout<>(spoutConfig);
+ spout = new KafkaSpout<>(spoutConfig);
}
- private void populateTopicData(String topicName, int msgCount) throws Exception {
- kafkaUnitRule.getKafkaUnit().createTopic(topicName);
-
- for (int i = 0; i < msgCount; i++) {
- ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
- topicName, Integer.toString(i),
- Integer.toString(i));
-
- kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
- }
- }
-
- private void initializeSpout(int msgCount) throws Exception {
- populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
- spout.open(conf, topologyContext, collector);
- spout.activate();
+ private void prepareSpout(int msgCount) throws Exception {
+ SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+ SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
}
private ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) throws Exception {
assertThat("The message count is less than maxUncommittedOffsets. This test is not meaningful with this configuration.", messageCount, greaterThanOrEqualTo(maxUncommittedOffsets));
//The spout must respect maxUncommittedOffsets when requesting/emitting tuples
- initializeSpout(messageCount);
+ prepareSpout(messageCount);
//Try to emit all messages. Ensure only maxUncommittedOffsets are emitted
ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
for (int i = 0; i < messageCount; i++) {
spout.nextTuple();
- };
+ }
verify(collector, times(maxUncommittedOffsets)).emit(
anyString(),
anyList(),
@@ -128,6 +122,7 @@ public class MaxUncommittedOffsetTest {
spout.ack(messageId);
}
Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+
spout.nextTuple();
//Now check that the spout will emit another maxUncommittedOffsets messages
@@ -183,8 +178,60 @@ public class MaxUncommittedOffsetTest {
@Test
public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollRecordsMessages() throws Exception {
- //The upper bound on uncommitted offsets should be maxUncommittedOffsets + maxPollRecords - 1
- //This is reachable by emitting maxUncommittedOffsets messages, acking the first message, then polling.
+ /*
+ For each partition the spout is allowed to retry all tuples between the committed offset, and maxUncommittedOffsets ahead.
+ It is not allowed to retry tuples past that limit.
+ This makes the actual limit per partition maxUncommittedOffsets + maxPollRecords - 1,
+ reached if the tuple at the maxUncommittedOffsets limit is the earliest retriable tuple,
+ or if the spout is 1 tuple below the limit, and receives a full maxPollRecords tuples in the poll.
+ */
+
+ try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
+ //First check that maxUncommittedOffsets is respected when emitting from scratch
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
+ reset(collector);
+
+ //Fail only the last tuple
+ List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
+ KafkaSpoutMessageId failedMessageId = messageIdList.get(messageIdList.size() - 1);
+ spout.fail(failedMessageId);
+
+ //Offset 0 to maxUncommittedOffsets - 2 are pending, maxUncommittedOffsets - 1 is failed but not retriable
+ //The spout should not emit any more tuples.
+ spout.nextTuple();
+ verify(collector, never()).emit(
+ anyString(),
+ anyList(),
+ any(KafkaSpoutMessageId.class));
+
+ //Allow the failed record to retry
+ Time.advanceTimeSecs(initialRetryDelaySecs);
+ for (int i = 0; i < maxPollRecords; i++) {
+ spout.nextTuple();
+ }
+ ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collector, times(maxPollRecords)).emit(
+ anyString(),
+ anyList(),
+ secondRunMessageIds.capture());
+ reset(collector);
+ assertThat(secondRunMessageIds.getAllValues().get(0), is(failedMessageId));
+
+ //There should now be maxUncommittedOffsets + maxPollRecords emitted in all.
+ //Fail the last emitted tuple and verify that the spout won't retry it because it's above the emit limit.
+ spout.fail(secondRunMessageIds.getAllValues().get(secondRunMessageIds.getAllValues().size() - 1));
+ Time.advanceTimeSecs(initialRetryDelaySecs);
+ spout.nextTuple();
+ verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+ }
+ }
+
+ @Test
+ public void testNextTupleWillAllowRetryForTuplesBelowEmitLimit() throws Exception {
+ /*
+ For each partition the spout is allowed to retry all tuples between the committed offset, and maxUncommittedOffsets ahead.
+ It must retry tuples within that limit, even if more tuples were emitted.
+ */
try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
//First check that maxUncommittedOffsets is respected when emitting from scratch
ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
@@ -192,9 +239,9 @@ public class MaxUncommittedOffsetTest {
failAllExceptTheFirstMessageThenCommit(messageIds);
- //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed
+ //Offset 0 is committed, 1 to maxUncommittedOffsets - 1 are failed but not retriable
//The spout should now emit another maxPollRecords messages
- //This is allowed because the acked message brings the numUncommittedOffsets below the cap
+ //This is allowed because the committed message brings the numUncommittedOffsets below the cap
for (int i = 0; i < maxUncommittedOffsets; i++) {
spout.nextTuple();
}
@@ -216,18 +263,20 @@ public class MaxUncommittedOffsetTest {
}
assertThat("Expected the newly emitted messages to have no overlap with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false));
- //Offset 0 is acked, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
- //There are now maxUncommittedOffsets-1 + maxPollRecords records emitted past the last committed offset
- //Advance time so the failed tuples become ready for retry, and check that the spout will emit retriable tuples as long as numNonRetriableEmittedTuples < maxUncommittedOffsets
-
- int numNonRetriableEmittedTuples = maxPollRecords; //The other tuples were failed and are becoming retriable
- int allowedPolls = (int)Math.ceil((maxUncommittedOffsets - numNonRetriableEmittedTuples)/(double)maxPollRecords);
+ //Offset 0 is committed, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
+ //Fail the last tuples so only offset 0 is not failed.
+ //Advance time so the failed tuples become ready for retry, and check that the spout will emit retriable tuples
+ //for all the failed tuples that are within maxUncommittedOffsets tuples of the committed offset
+ //This means 1 to maxUncommitteddOffsets, but not maxUncommittedOffsets+1...maxUncommittedOffsets+maxPollRecords-1
+ for(KafkaSpoutMessageId msgId : secondRunMessageIds.getAllValues()) {
+ spout.fail(msgId);
+ }
Time.advanceTimeSecs(initialRetryDelaySecs);
for (int i = 0; i < numMessages; i++) {
spout.nextTuple();
}
ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collector, times(allowedPolls*maxPollRecords)).emit(
+ verify(collector, times(maxUncommittedOffsets)).emit(
anyString(),
anyList(),
thirdRunMessageIds.capture());
@@ -238,8 +287,7 @@ public class MaxUncommittedOffsetTest {
thirdRunOffsets.add(msgId.offset());
}
- assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch", thirdRunOffsets, everyItem(isIn(firstRunOffsets)));
+ assertThat("Expected the emitted messages to be retries of the failed tuples from the first batch, plus the first failed tuple from the second batch", thirdRunOffsets, everyItem(either(isIn(firstRunOffsets)).or(is(secondRunMessageIds.getAllValues().get(0).offset()))));
}
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
index e97c7e1..fe3325c 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
@@ -16,10 +16,11 @@
package org.apache.storm.kafka.spout;
+import org.apache.storm.kafka.spout.NamedTopicFilter;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
-
import static org.mockito.Mockito.when;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
index 877efdc..335ab31 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
@@ -16,6 +16,8 @@
package org.apache.storm.kafka.spout;
+import org.apache.storm.kafka.spout.PatternTopicFilter;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
deleted file mode 100644
index 436d052..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
-
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.storm.kafka.KafkaUnitRule;
-import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Values;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Time.SimulatedTime;
-import org.hamcrest.Matchers;
-import org.junit.Before;
-import org.mockito.Captor;
-import org.mockito.MockitoAnnotations;
-
-public class SingleTopicKafkaSpoutTest {
-
- @Rule
- public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
-
- @Captor
- private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
-
- private final TopologyContext topologyContext = mock(TopologyContext.class);
- private final Map<String, Object> conf = new HashMap<>();
- private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
- private final long commitOffsetPeriodMs = 2_000;
- private KafkaConsumer<String, String> consumerSpy;
- private KafkaConsumerFactory<String, String> consumerFactory;
- private KafkaSpout<String, String> spout;
- private final int maxPollRecords = 10;
- private final int maxRetries = 3;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
- .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
- .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
- maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
- .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
- .build();
- this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
- this.consumerFactory = new KafkaConsumerFactory<String, String>() {
- @Override
- public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
- return consumerSpy;
- }
-
- };
- this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
- }
-
- void populateTopicData(String topicName, int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
- kafkaUnitRule.getKafkaUnit().createTopic(topicName);
-
- for (int i = 0; i < msgCount; i++) {
- ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
- topicName, Integer.toString(i),
- Integer.toString(i));
- kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
- }
- }
-
- private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
- populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
- spout.open(conf, topologyContext, collector);
- spout.activate();
- }
-
- /*
- * Asserts that commitSync has been called once,
- * that there are only commits on one topic,
- * and that the committed offset covers messageCount messages
- */
- private void verifyAllMessagesCommitted(long messageCount) {
- verify(consumerSpy, times(1)).commitSync(commitCapture.capture());
- Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
- assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1));
- OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue();
- assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount));
- }
-
- @Test
- public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception {
- try (SimulatedTime simulatedTime = new SimulatedTime()) {
- int messageCount = maxPollRecords * 2;
- initializeSpout(messageCount);
-
- //Emit all messages and fail the first one while acking the rest
- for (int i = 0; i < messageCount; i++) {
- spout.nextTuple();
- }
- ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIdCaptor.capture());
- List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues();
- for (int i = 1; i < messageIds.size(); i++) {
- spout.ack(messageIds.get(i));
- }
- KafkaSpoutMessageId failedTuple = messageIds.get(0);
- spout.fail(failedTuple);
-
- //Advance the time and replay the failed tuple.
- reset(collector);
- spout.nextTuple();
- ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collector).emit(anyString(), anyList(), failedIdReplayCaptor.capture());
-
- assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple));
-
- /* Ack the tuple, and commit.
- * Since the tuple is more than max poll records behind the most recent emitted tuple, the consumer won't catch up in this poll.
- */
- Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
- spout.ack(failedIdReplayCaptor.getValue());
- spout.nextTuple();
- verify(consumerSpy).commitSync(commitCapture.capture());
-
- Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue();
- TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
- assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp));
- assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCount));
-
- /* Verify that the following acked (now committed) tuples are not emitted again
- * Since the consumer position was somewhere in the middle of the acked tuples when the commit happened,
- * this verifies that the spout keeps the consumer position ahead of the committed offset when committing
- */
- reset(collector);
- //Just do a few polls to check that nothing more is emitted
- for(int i = 0; i < 3; i++) {
- spout.nextTuple();
- }
- verify(collector, never()).emit(anyString(), anyList(), anyObject());
- }
- }
-
- @Test
- public void shouldContinueWithSlowDoubleAcks() throws Exception {
- try (SimulatedTime simulatedTime = new SimulatedTime()) {
- int messageCount = 20;
- initializeSpout(messageCount);
-
- //play 1st tuple
- ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
- spout.nextTuple();
- verify(collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture());
- spout.ack(messageIdToDoubleAck.getValue());
-
- //Emit some more messages
- for(int i = 0; i < messageCount / 2; i++) {
- spout.nextTuple();
- }
-
- spout.ack(messageIdToDoubleAck.getValue());
-
- //Emit any remaining messages
- for(int i = 0; i < messageCount; i++) {
- spout.nextTuple();
- }
-
- //Verify that all messages are emitted, ack all the messages
- ArgumentCaptor<Object> messageIds = ArgumentCaptor.forClass(Object.class);
- verify(collector, times(messageCount)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- anyList(),
- messageIds.capture());
- for(Object id : messageIds.getAllValues()) {
- spout.ack(id);
- }
-
- Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
- //Commit offsets
- spout.nextTuple();
-
- verifyAllMessagesCommitted(messageCount);
- }
- }
-
- @Test
- public void shouldEmitAllMessages() throws Exception {
- try (SimulatedTime simulatedTime = new SimulatedTime()) {
- int messageCount = 10;
- initializeSpout(messageCount);
-
- //Emit all messages and check that they are emitted. Ack the messages too
- for(int i = 0; i < messageCount; i++) {
- spout.nextTuple();
- ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
- verify(collector).emit(
- eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
- Integer.toString(i),
- Integer.toString(i))),
- messageId.capture());
- spout.ack(messageId.getValue());
- reset(collector);
- }
-
- Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
- //Commit offsets
- spout.nextTuple();
-
- verifyAllMessagesCommitted(messageCount);
- }
- }
-
- @Test
- public void shouldReplayInOrderFailedMessages() throws Exception {
- try (SimulatedTime simulatedTime = new SimulatedTime()) {
- int messageCount = 10;
- initializeSpout(messageCount);
-
- //play and ack 1 tuple
- ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
- spout.nextTuple();
- verify(collector).emit(anyString(), anyList(), messageIdAcked.capture());
- spout.ack(messageIdAcked.getValue());
- reset(collector);
-
- //play and fail 1 tuple
- ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
- spout.nextTuple();
- verify(collector).emit(anyString(), anyList(), messageIdFailed.capture());
- spout.fail(messageIdFailed.getValue());
- reset(collector);
-
- //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
- for(int i = 0; i < messageCount; i++) {
- spout.nextTuple();
- }
-
- ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
- //All messages except the first acked message should have been emitted
- verify(collector, times(messageCount - 1)).emit(
- eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- anyList(),
- remainingMessageIds.capture());
- for(Object id : remainingMessageIds.getAllValues()) {
- spout.ack(id);
- }
-
- Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
- //Commit offsets
- spout.nextTuple();
-
- verifyAllMessagesCommitted(messageCount);
- }
- }
-
- @Test
- public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
- try (SimulatedTime simulatedTime = new SimulatedTime()) {
- int messageCount = 10;
- initializeSpout(messageCount);
-
- //play 1st tuple
- ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
- spout.nextTuple();
- verify(collector).emit(anyString(), anyList(), messageIdToFail.capture());
- reset(collector);
-
- //play 2nd tuple
- ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
- spout.nextTuple();
- verify(collector).emit(anyString(), anyList(), messageIdToAck.capture());
- reset(collector);
-
- //ack 2nd tuple
- spout.ack(messageIdToAck.getValue());
- //fail 1st tuple
- spout.fail(messageIdToFail.getValue());
-
- //Emit all remaining messages. Failed tuples retry immediately with current configuration, so no need to wait.
- for(int i = 0; i < messageCount; i++) {
- spout.nextTuple();
- }
-
- ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
- //All messages except the first acked message should have been emitted
- verify(collector, times(messageCount - 1)).emit(
- eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- anyList(),
- remainingIds.capture());
- for(Object id : remainingIds.getAllValues()) {
- spout.ack(id);
- }
-
- Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
- //Commit offsets
- spout.nextTuple();
-
- verifyAllMessagesCommitted(messageCount);
- }
- }
-
- @Test
- public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception {
- //The spout must reemit retriable tuples, even if they fail out of order.
- //The spout should be able to skip tuples it has already emitted when retrying messages, even if those tuples are also retries.
- int messageCount = 10;
- initializeSpout(messageCount);
-
- //play all tuples
- for (int i = 0; i < messageCount; i++) {
- spout.nextTuple();
- }
- ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collector, times(messageCount)).emit(anyString(), anyList(), messageIds.capture());
- reset(collector);
- //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
- List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues();
- spout.fail(capturedMessageIds.get(5));
- spout.fail(capturedMessageIds.get(3));
- spout.nextTuple();
- spout.fail(capturedMessageIds.get(2));
-
- //Check that the spout will reemit all 3 failed tuples and no other tuples
- ArgumentCaptor<KafkaSpoutMessageId> reemittedMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- for (int i = 0; i < messageCount; i++) {
- spout.nextTuple();
- }
- verify(collector, times(3)).emit(anyString(), anyList(), reemittedMessageIds.capture());
- Set<KafkaSpoutMessageId> expectedReemitIds = new HashSet<>();
- expectedReemitIds.add(capturedMessageIds.get(5));
- expectedReemitIds.add(capturedMessageIds.get(3));
- expectedReemitIds.add(capturedMessageIds.get(2));
- assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
- }
-
- @Test
- public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
- //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
- int messageCount = 1;
- initializeSpout(messageCount);
-
- //Emit and fail the same tuple until we've reached retry limit
- for (int i = 0; i <= maxRetries; i++) {
- ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- spout.nextTuple();
- verify(collector).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture());
- KafkaSpoutMessageId msgId = messageIdFailed.getValue();
- spout.fail(msgId);
- assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1));
- reset(collector);
- }
-
- //Verify that the tuple is not emitted again
- spout.nextTuple();
- verify(collector, never()).emit(anyString(), anyListOf(Object.class), anyObject());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
new file mode 100644
index 0000000..f5b9423
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaUnitSetupHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnit;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.mockito.ArgumentCaptor;
+
+public class SingleTopicKafkaUnitSetupHelper {
+
+ /**
+ * Using the given KafkaUnit instance, put some messages in the specified topic.
+ *
+ * @param kafkaUnit The KafkaUnit instance to use
+ * @param topicName The topic to produce messages for
+ * @param msgCount The number of messages to produce
+ */
+ public static void populateTopicData(KafkaUnit kafkaUnit, String topicName, int msgCount) throws Exception {
+ kafkaUnit.createTopic(topicName);
+
+ for (int i = 0; i < msgCount; i++) {
+ ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
+ topicName, Integer.toString(i),
+ Integer.toString(i));
+ kafkaUnit.sendMessage(producerRecord);
+ }
+ }
+
+ /*
+ * Asserts that commitSync has been called once,
+ * that there are only commits on one topic,
+ * and that the committed offset covers messageCount messages
+ */
+ public static <K, V> void verifyAllMessagesCommitted(KafkaConsumer<K, V> consumerSpy,
+ ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture, long messageCount) {
+ verify(consumerSpy, times(1)).commitSync(commitCapture.capture());
+ Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+ assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1));
+ OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue();
+ assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount));
+ }
+
+ /**
+ * Open and activate a KafkaSpout that acts as a single-task/executor spout.
+ *
+ * @param <K> Kafka key type
+ * @param <V> Kafka value type
+ * @param spout The spout to prepare
+ * @param topoConf The topoConf
+ * @param topoContextMock The TopologyContext mock
+ * @param collectorMock The output collector mock
+ */
+ public static <K, V> void initializeSpout(KafkaSpout<K, V> spout, Map<String, Object> topoConf, TopologyContext topoContextMock,
+ SpoutOutputCollector collectorMock) throws Exception {
+ when(topoContextMock.getThisTaskIndex()).thenReturn(0);
+ when(topoContextMock.getComponentTasks(anyString())).thenReturn(Collections.singletonList(0));
+ spout.open(topoConf, topoContextMock, collectorMock);
+ spout.activate();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
new file mode 100644
index 0000000..3aad61e
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mockingDetails;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class SpoutWithMockedConsumerSetupHelper {
+
+ /**
+ * Creates, opens and activates a KafkaSpout using a mocked consumer. The subscription should be a mock object, since this method skips
+ * the subscription and instead just configures the mocked consumer to act as if the specified partitions are assigned to it.
+ *
+ * @param <K> The Kafka key type
+ * @param <V> The Kafka value type
+ * @param spoutConfig The spout config to use
+ * @param topoConf The topo conf to pass to the spout
+ * @param contextMock The topo context to pass to the spout
+ * @param collectorMock The mocked collector to pass to the spout
+ * @param consumerMock The mocked consumer
+ * @param assignedPartitions The partitions to assign to this spout. The consumer will act like these partitions are assigned to it.
+ * @return The spout
+ */
+ public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf,
+ TopologyContext contextMock, SpoutOutputCollector collectorMock, final KafkaConsumer<K, V> consumerMock, TopicPartition... assignedPartitions) {
+ Subscription subscriptionMock = spoutConfig.getSubscription();
+ if (!mockingDetails(subscriptionMock).isMock()) {
+ throw new IllegalStateException("Use a mocked subscription when using this method, it helps avoid complex stubbing");
+ }
+
+ final Set<TopicPartition> assignedPartitionsSet = new HashSet<>(Arrays.asList(assignedPartitions));
+
+ when(consumerMock.assignment()).thenReturn(assignedPartitionsSet);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ ConsumerRebalanceListener listener = (ConsumerRebalanceListener) invocation.getArguments()[1];
+ listener.onPartitionsAssigned(assignedPartitionsSet);
+ return null;
+ }
+
+ }).when(subscriptionMock).subscribe(any(KafkaConsumer.class), any(ConsumerRebalanceListener.class), any(TopologyContext.class));
+
+ KafkaConsumerFactory<K, V> consumerFactory = new KafkaConsumerFactory<K, V>() {
+ @Override
+ public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ return consumerMock;
+ }
+ };
+ KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+ spout.open(topoConf, contextMock, collectorMock);
+ spout.activate();
+
+ return spout;
+ }
+
+ /**
+ * Creates sequential dummy records
+ *
+ * @param <K> The Kafka key type
+ * @param <V> The Kafka value type
+ * @param topic The topic partition to create records for
+ * @param startingOffset The starting offset of the records
+ * @param numRecords The number of records to create
+ * @return The dummy records
+ */
+ public static <K, V> List<ConsumerRecord<K, V>> createRecords(TopicPartition topic, long startingOffset, int numRecords) {
+ List<ConsumerRecord<K, V>> recordsForPartition = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordsForPartition.add(new ConsumerRecord<K, V>(topic.topic(), topic.partition(), startingOffset + i, null, null));
+ }
+ return recordsForPartition;
+ }
+
+ /**
+ * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids
+ *
+ * @param <K> The Kafka key type
+ * @param <V> The Kafka value type
+ * @param spout The spout
+ * @param consumerMock The consumer used by the spout
+ * @param expectedEmits The number of expected emits
+ * @param collectorMock The collector used by the spout
+ * @param partition The partition to emit messages on
+ * @param offsetsToEmit The offsets to emit
+ * @return The message ids emitted by the spout during the nextTuple calls
+ */
+ public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, TopicPartition partition, int... offsetsToEmit) {
+ return pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, Collections.singletonMap(partition, offsetsToEmit));
+ }
+
+ /**
+ * Creates messages for the input offsets, emits the messages by calling nextTuple once per offset and returns the captured message ids
+ *
+ * @param <K> The Kafka key type
+ * @param <V> The Kafka value type
+ * @param spout The spout
+ * @param consumerMock The consumer used by the spout
+ * @param collectorMock The collector used by the spout
+ * @param offsetsToEmit The offsets to emit per partition
+ * @return The message ids emitted by the spout during the nextTuple calls
+ */
+ public static <K, V> List<KafkaSpoutMessageId> pollAndEmit(KafkaSpout<K, V> spout, KafkaConsumer<K, V> consumerMock, int expectedEmits, SpoutOutputCollector collectorMock, Map<TopicPartition, int[]> offsetsToEmit) {
+ int totalOffsets = 0;
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+ for (Entry<TopicPartition, int[]> entry : offsetsToEmit.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ List<ConsumerRecord<K, V>> tpRecords = new ArrayList<>();
+ for (Integer offset : entry.getValue()) {
+ tpRecords.add(new ConsumerRecord<K, V>(tp.topic(), tp.partition(), offset, null, null));
+ totalOffsets++;
+ }
+ records.put(tp, tpRecords);
+ }
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(records));
+
+ for (int i = 0; i < totalOffsets; i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(expectedEmits)).emit(anyString(), anyList(), messageIds.capture());
+ return messageIds.getAllValues();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index d5c052b..b178687 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -17,17 +17,20 @@
*/
package org.apache.storm.kafka.spout.builders;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
import java.util.List;
-
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.Func;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.Subscription;
import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
@@ -46,7 +49,7 @@ public class SingleTopicKafkaSpoutConfiguration {
public static StormTopology getTopologyKafkaSpout(int port) {
final TopologyBuilder tp = new TopologyBuilder();
- tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1);
+ tp.setSpout("kafka_spout", new KafkaSpout<>(SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(port).build()), 1);
tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
return tp.createTopology();
}
@@ -57,21 +60,33 @@ public class SingleTopicKafkaSpoutConfiguration {
return new Values(r.topic(), r.key(), r.value());
}
};
-
- public static KafkaSpoutConfig.Builder<String,String> getKafkaSpoutConfigBuilder(int port) {
- return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
- .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
- new Fields("topic", "key", "value"), STREAM)
- .setGroupId("kafkaSpoutTestGroup")
- .setMaxPollRecords(5)
- .setRetry(getRetryService())
- .setOffsetCommitPeriodMs(10_000)
- .setFirstPollOffsetStrategy(EARLIEST)
- .setMaxUncommittedOffsets(250)
- .setPollTimeoutMs(1000);
+
+ public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(int port) {
+ return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC));
+ }
+
+ public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(Subscription subscription, int port) {
+ return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<String, String>("127.0.0.1:" + port, subscription));
}
-
- protected static KafkaSpoutRetryService getRetryService() {
- return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE;
+
+ public static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
+ return config
+ .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
+ new Fields("topic", "key", "value"), STREAM)
+ .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+ .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+ .setRetry(getNoDelayRetryService())
+ .setOffsetCommitPeriodMs(10_000)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setMaxUncommittedOffsets(250)
+ .setPollTimeoutMs(1000);
+ }
+
+ protected static KafkaSpoutRetryService getNoDelayRetryService() {
+ /**
+ * Retry in a tight loop (keep unit tests fasts).
+ */
+ return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+ DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
index abbacf9..9972d4c 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/internal/OffsetManagerTest.java
@@ -18,10 +18,12 @@ package org.apache.storm.kafka.spout.internal;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import java.util.NoSuchElementException;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
@@ -30,6 +32,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
public class OffsetManagerTest {
+ private static final String COMMIT_METADATA = "{\"topologyId\":\"tp1\",\"taskId\":3,\"threadName\":\"Thread-20\"}";
@Rule
public ExpectedException expect = ExpectedException.none();
@@ -56,12 +59,12 @@ public class OffsetManagerTest {
manager.addToAckMsgs(getMessageId(initialFetchOffset + 2));
manager.addToAckMsgs(getMessageId(initialFetchOffset + 6));
- assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset().offset(), is(initialFetchOffset + 3));
+ assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(COMMIT_METADATA).offset(), is(initialFetchOffset + 3));
manager.addToAckMsgs(getMessageId(initialFetchOffset + 5));
- assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted",
- manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7));
+ assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted",
+ manager.findNextCommitOffset(COMMIT_METADATA), is(new OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA)));
}
@Test
@@ -71,17 +74,17 @@ public class OffsetManagerTest {
manager.addToEmitMsgs(initialFetchOffset + 6);
manager.addToAckMsgs(getMessageId(initialFetchOffset + 6));
- assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(), is(nullValue()));
+ assertThat("The offset manager should not skip past offset 5 which is still pending", manager.findNextCommitOffset(COMMIT_METADATA), is(nullValue()));
manager.addToAckMsgs(getMessageId(initialFetchOffset + 5));
assertThat("The offset manager should skip past the gap in acked messages, since the messages were not emitted",
- manager.findNextCommitOffset().offset(), is(initialFetchOffset + 7));
+ manager.findNextCommitOffset(COMMIT_METADATA), is(new OffsetAndMetadata(initialFetchOffset + 7, COMMIT_METADATA)));
}
@Test
public void testFindNextCommittedOffsetWithNoAcks() {
- OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+ OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
assertThat("There shouldn't be a next commit offset when nothing has been acked", nextCommitOffset, is(nullValue()));
}
@@ -92,7 +95,7 @@ public class OffsetManagerTest {
* lastProcessedMessageOffset + 1. "
*/
emitAndAckMessage(getMessageId(initialFetchOffset));
- OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+ OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 1));
}
@@ -100,7 +103,7 @@ public class OffsetManagerTest {
public void testFindNextCommitOffsetWithMultipleOutOfOrderAcks() {
emitAndAckMessage(getMessageId(initialFetchOffset + 1));
emitAndAckMessage(getMessageId(initialFetchOffset));
- OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+ OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
assertThat("The next commit offset should be one past the processed message offset", nextCommitOffset.offset(), is(initialFetchOffset + 2));
}
@@ -109,7 +112,7 @@ public class OffsetManagerTest {
emitAndAckMessage(getMessageId(initialFetchOffset + 2));
manager.addToEmitMsgs(initialFetchOffset + 1);
emitAndAckMessage(getMessageId(initialFetchOffset));
- OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+ OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
assertThat("The next commit offset should cover the sequential acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
}
@@ -123,7 +126,7 @@ public class OffsetManagerTest {
*/
emitAndAckMessage(getMessageId(initialFetchOffset + 2));
emitAndAckMessage(getMessageId(initialFetchOffset));
- OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+ OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
assertThat("The next commit offset should cover all the acked offsets, since the offset in the gap hasn't been emitted and doesn't exist",
nextCommitOffset.offset(), is(initialFetchOffset + 3));
}
@@ -132,7 +135,7 @@ public class OffsetManagerTest {
public void testFindNextCommitOffsetWithUnackedOffsetGap() {
manager.addToEmitMsgs(initialFetchOffset + 1);
emitAndAckMessage(getMessageId(initialFetchOffset));
- OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset();
+ OffsetAndMetadata nextCommitOffset = manager.findNextCommitOffset(COMMIT_METADATA);
assertThat("The next commit offset should cover the contiguously acked offsets", nextCommitOffset.offset(), is(initialFetchOffset + 1));
}
@@ -140,7 +143,7 @@ public class OffsetManagerTest {
public void testFindNextCommitOffsetWhenTooLowOffsetIsAcked() {
OffsetManager startAtHighOffsetManager = new OffsetManager(testTp, 10);
emitAndAckMessage(getMessageId(0));
- OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset();
+ OffsetAndMetadata nextCommitOffset = startAtHighOffsetManager.findNextCommitOffset(COMMIT_METADATA);
assertThat("Acking an offset earlier than the committed offset should have no effect", nextCommitOffset, is(nullValue()));
}
@@ -170,4 +173,24 @@ public class OffsetManagerTest {
manager.addToAckMsgs(msgId);
}
+ @Test
+ public void testGetNthUncommittedOffsetAfterCommittedOffset() {
+ manager.addToEmitMsgs(initialFetchOffset + 1);
+ manager.addToEmitMsgs(initialFetchOffset + 2);
+ manager.addToEmitMsgs(initialFetchOffset + 5);
+ manager.addToEmitMsgs(initialFetchOffset + 30);
+
+ assertThat("The third uncommitted offset should be 5", manager.getNthUncommittedOffsetAfterCommittedOffset(3), is(initialFetchOffset + 5L));
+ assertThat("The fourth uncommitted offset should be 30", manager.getNthUncommittedOffsetAfterCommittedOffset(4), is(initialFetchOffset + 30L));
+
+ expect.expect(NoSuchElementException.class);
+ manager.getNthUncommittedOffsetAfterCommittedOffset(5);
+ }
+
+ @Test
+ public void testCommittedFlagSetOnCommit() throws Exception {
+ assertFalse(manager.hasCommitted());
+ manager.commit(mock(OffsetAndMetadata.class));
+ assertTrue(manager.hasCommitted());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
new file mode 100644
index 0000000..9a2a682
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.ManualPartitionSubscription;
+import org.apache.storm.kafka.spout.ManualPartitioner;
+import org.apache.storm.kafka.spout.TopicFilter;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class ManualPartitionSubscriptionTest {
+
+ @Test
+ public void testCanReassignPartitions() {
+ ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
+ TopicFilter filterMock = mock(TopicFilter.class);
+ KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+ ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class);
+ TopologyContext contextMock = mock(TopologyContext.class);
+ ManualPartitionSubscription subscription = new ManualPartitionSubscription(partitionerMock, filterMock);
+
+ List<TopicPartition> onePartition = Collections.singletonList(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+ List<TopicPartition> twoPartitions = new ArrayList<>();
+ twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+ twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1));
+ when(partitionerMock.partition(anyList(), any(TopologyContext.class)))
+ .thenReturn(onePartition)
+ .thenReturn(twoPartitions);
+
+ //Set the first assignment
+ subscription.subscribe(consumerMock, listenerMock, contextMock);
+
+ InOrder inOrder = inOrder(consumerMock, listenerMock);
+ inOrder.verify(consumerMock).assign(new HashSet<>(onePartition));
+ inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition));
+
+ reset(consumerMock, listenerMock);
+
+ when(consumerMock.assignment()).thenReturn(new HashSet<>(onePartition));
+
+ //Update to set the second assignment
+ subscription.refreshAssignment();
+
+ //The partition revocation hook must be called before the new partitions are assigned to the consumer,
+ //to allow the revocation hook to commit offsets for the revoked partitions.
+ inOrder.verify(listenerMock).onPartitionsRevoked(new HashSet<>(onePartition));
+ inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions));
+ inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(twoPartitions));
+ }
+
+}