You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:04 UTC
[03/14] storm git commit: STORM-2937: Overwrite storm-kafka-client
1.x-branch into 1.0.x-branch: copied external/storm-kafka-client from
1.x-branch
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
new file mode 100644
index 0000000..681953d
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.junit.Test;
+
+public class DefaultRecordTranslatorTest {
+ @Test
+ public void testBasic() {
+ DefaultRecordTranslator<String, String> trans = new DefaultRecordTranslator<>();
+ assertEquals(Arrays.asList("default"), trans.streams());
+ assertEquals(new Fields("topic", "partition", "offset", "key", "value"), trans.getFieldsFor("default"));
+ ConsumerRecord<String, String> cr = new ConsumerRecord<>("TOPIC", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(Arrays.asList("TOPIC", 100, 100l, "THE KEY", "THE VALUE"), trans.apply(cr));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
new file mode 100644
index 0000000..0467383
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaSpoutAbstractTest {
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+ final TopologyContext topologyContext = mock(TopologyContext.class);
+ final Map<String, Object> conf = new HashMap<>();
+ final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ final long commitOffsetPeriodMs;
+
+ KafkaConsumer<String, String> consumerSpy;
+ KafkaSpout<String, String> spout;
+
+ @Captor
+ ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+ private Time.SimulatedTime simulatedTime;
+ private KafkaSpoutConfig<String, String> spoutConfig;
+
+ /**
+ * This constructor should be called by the subclass' default constructor with the desired value
+ * @param commitOffsetPeriodMs commit offset period to be used in commit and verification of messages committed
+ */
+ protected KafkaSpoutAbstractTest(long commitOffsetPeriodMs) {
+ this.commitOffsetPeriodMs = commitOffsetPeriodMs;
+ }
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ spoutConfig = createSpoutConfig();
+
+ consumerSpy = createConsumerSpy();
+
+ spout = new KafkaSpout<>(spoutConfig, createConsumerFactory());
+
+ simulatedTime = new Time.SimulatedTime();
+ }
+
+ private KafkaConsumerFactory<String, String> createConsumerFactory() {
+
+ return new KafkaConsumerFactory<String, String>() {
+ @Override
+ public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+ return consumerSpy;
+ }
+
+ };
+ }
+
+ KafkaConsumer<String, String> createConsumerSpy() {
+ return spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ simulatedTime.close();
+ }
+
+ abstract KafkaSpoutConfig<String, String> createSpoutConfig();
+
+ void prepareSpout(int messageCount) throws Exception {
+ SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
+ SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
+ }
+
+ /**
+ * Helper method to in sequence do:
+ * <li>
+ * <ul>spout.nexTuple()</ul>
+ * <ul>verify messageId</ul>
+ * <ul>spout.ack(msgId)</ul>
+ * <ul>reset(collector) to be able to reuse mock</ul>
+ * </li>
+ *
+ * @param offset offset of message to be verified
+ * @return {@link ArgumentCaptor} of the messageId verified
+ */
+ ArgumentCaptor<Object> nextTuple_verifyEmitted_ack_resetCollector(int offset) {
+ spout.nextTuple();
+
+ ArgumentCaptor<Object> messageId = verifyMessageEmitted(offset);
+
+ spout.ack(messageId.getValue());
+
+ reset(collectorMock);
+
+ return messageId;
+ }
+
+ // offset and messageId are used interchangeably
+ ArgumentCaptor<Object> verifyMessageEmitted(int offset) {
+ final ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
+
+ verify(collectorMock).emit(
+ eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+ eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
+ Integer.toString(offset),
+ Integer.toString(offset))),
+ messageId.capture());
+
+ return messageId;
+ }
+
+ void commitAndVerifyAllMessagesCommitted(long msgCount) {
+ // reset commit timer such that commit happens on next call to nextTuple()
+ Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+
+ //Commit offsets
+ spout.nextTuple();
+
+ verifyAllMessagesCommitted(msgCount);
+ }
+
+ /*
+ * Asserts that commitSync has been called once,
+ * that there are only commits on one topic,
+ * and that the committed offset covers messageCount messages
+ */
+ void verifyAllMessagesCommitted(long messageCount) {
+ verify(consumerSpy).commitSync(commitCapture.capture());
+
+ final Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+ assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1));
+
+ OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue();
+ assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount));
+
+ reset(consumerSpy);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
new file mode 100644
index 0000000..90e906b
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.hamcrest.CoreMatchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class KafkaSpoutConfigTest {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testBasic() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic").build();
+ assertEquals(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, conf.getFirstPollOffsetStrategy());
+ assertNull(conf.getConsumerGroupId());
+ assertTrue(conf.getTranslator() instanceof DefaultRecordTranslator);
+ HashMap<String, Object> expected = new HashMap<>();
+ expected.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1234");
+ expected.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ assertEquals(expected, conf.getKafkaProps());
+ assertEquals(KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS, conf.getMetricsTimeBucketSizeInSecs());
+ }
+
+ @Test
+ public void testSetEmitNullTuplesToTrue() {
+ final KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setEmitNullTuples(true)
+ .build();
+
+ assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples());
+ }
+
+ @Test
+ public void testShouldNotChangeAutoOffsetResetPolicyWhenNotUsingAtLeastOnce() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .build();
+
+ assertThat("When at-least-once is not specified, the spout should use the Kafka default auto offset reset policy",
+ conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), nullValue());
+ }
+
+ @Test
+ public void testWillRespectExplicitAutoOffsetResetPolicy() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+ .build();
+
+ assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee",
+ (String)conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none"));
+ }
+
+ @Test
+ public void testCanConfigureWithExplicitTrueBooleanAutoCommitMode() {
+ /*
+ * Since adding setProcessingGuarantee to KafkaSpoutConfig we don't want users to set "enable.auto.commit" in the consumer config,
+ * because setting the processing guarantee will do it automatically. For backward compatibility we need to be able to handle the
+ * property being set anyway for a few releases, and try to set a processing guarantee that corresponds to the property.
+ */
+
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
+ .build();
+
+ assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee",
+ conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE));
+ }
+
+ @Test
+ public void testCanConfigureWithExplicitFalseBooleanAutoCommitMode() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
+ .build();
+
+ assertThat("When setting enable auto commit to false explicitly the spout should use the 'at-least-once' processing guarantee",
+ conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE));
+ }
+
+ @Test
+ public void testCanConfigureWithExplicitTrueStringAutoCommitMode() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+ .build();
+
+ assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee",
+ conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE));
+ }
+
+ @Test
+ public void testCanConfigureWithExplicitFalseStringAutoCommitMode() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ .build();
+
+ assertThat("When setting enable auto commit explicitly to false the spout should use the 'at-least-once' processing guarantee",
+ conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE));
+ }
+
+ @Test
+ public void testCanGetKeyDeserializerWhenUsingDefaultBuilder() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .build();
+
+ assertThat("When using the default builder methods, the key deserializer should default to StringDeserializer",
+ conf.getKeyDeserializer(), instanceOf(StringDeserializer.class));
+ }
+
+ @Test
+ public void testCanGetValueDeserializerWhenUsingDefaultBuilder() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .build();
+
+ assertThat("When using the default builder methods, the value deserializer should default to StringDeserializer",
+ conf.getValueDeserializer(), instanceOf(StringDeserializer.class));
+ }
+
+ @Test
+ public void testCanOverrideDeprecatedDeserializerClassWithKafkaProps() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setKey(StringDeserializer.class)
+ .setValue(StringDeserializer.class)
+ .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+ .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+ .build();
+
+ assertThat("The last set key deserializer should be used, regardless of how it is set",
+ conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+ assertThat("The last set value deserializer should be used, regardless of how it is set",
+ conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+ }
+
+ private static class SerializableStringDeserializer implements SerializableDeserializer {
+
+ private final StringDeserializer delegate = new StringDeserializer();
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ delegate.configure(configs, isKey);
+ }
+
+ @Override
+ public Object deserialize(String topic, byte[] data) {
+ return delegate.deserialize(topic, data);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+ }
+
+ @Test
+ public void testCanOverrideDeprecatedDeserializerInstanceWithKafkaProps() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setKey(new SerializableStringDeserializer())
+ .setValue(new SerializableStringDeserializer())
+ .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+ .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+ .build();
+
+ assertThat("The last set key deserializer should be used, regardless of how it is set",
+ conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+ assertThat("The last set value deserializer should be used, regardless of how it is set",
+ conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+ }
+
+ @Test
+ public void testCanOverrideKafkaPropsWithDeprecatedDeserializerSetter() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+ .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+ .setKey(new SerializableStringDeserializer())
+ .setValue(new SerializableStringDeserializer())
+ .build();
+
+ assertThat("The last set key deserializer should be used, regardless of how it is set",
+ conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
+ assertThat("The last set value deserializer should be used, regardless of how it is set",
+ conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
+ }
+
+ @Test
+ public void testCanMixOldAndNewDeserializerSetter() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+ .setKey(new SerializableStringDeserializer())
+ .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+ .setValue(new SerializableStringDeserializer())
+ .build();
+
+ assertThat("The last set key deserializer should be used, regardless of how it is set",
+ conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+ assertThat("The last set value deserializer should be used, regardless of how it is set",
+ conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
+ }
+
+ @Test
+ public void testMetricsTimeBucketSizeInSecs() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setMetricsTimeBucketSizeInSecs(100)
+ .build();
+
+ assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
new file mode 100755
index 0000000..dbba04b
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.never;
+
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.mockito.InOrder;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.eq;
+
+public class KafkaSpoutEmitTest {
+
+ private final long offsetCommitPeriodMs = 2_000;
+ private final TopologyContext contextMock = mock(TopologyContext.class);
+ private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaSpoutConfig<String, String> spoutConfig;
+
+ @Before
+ public void setUp() {
+ spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .build();
+ consumerMock = mock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testNextTupleEmitsAtMostOneTuple() {
+ //The spout should emit at most one message per call to nextTuple
+ //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 10));
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(records));
+
+ spout.nextTuple();
+
+ verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+ }
+
+ @Test
+ public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() throws IOException {
+ //The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded
+
+ //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ int numRecords = spoutConfig.getMaxUncommittedOffsets();
+ //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+ records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, numRecords));
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(records));
+
+ for (int i = 0; i < numRecords; i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture());
+
+ for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+ spout.fail(messageId);
+ }
+
+ reset(collectorMock);
+
+ Time.advanceTime(50);
+ //No backoff for test retry service, just check that messages will retry immediately
+ for (int i = 0; i < numRecords; i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), retryMessageIds.capture());
+
+ //Verify that the poll started at the earliest retriable tuple offset
+ List<Long> failedOffsets = new ArrayList<>();
+ for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+ failedOffsets.add(msgId.offset());
+ }
+ InOrder inOrder = inOrder(consumerMock);
+ inOrder.verify(consumerMock).seek(partition, failedOffsets.get(0));
+ inOrder.verify(consumerMock).poll(anyLong());
+ }
+ }
+
+ @Test
+ public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
+ //This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit.
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo);
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+ records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()));
+ records.put(partitionTwo, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionTwo, 0, spoutConfig.getMaxUncommittedOffsets() + 1));
+ int numMessages = spoutConfig.getMaxUncommittedOffsets()*2 + 1;
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(records));
+
+ for (int i = 0; i < numMessages; i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(numMessages)).emit(anyString(), anyList(), messageIds.capture());
+
+ //Now fail a tuple on partition one and verify that it is allowed to retry, because the failed tuple is below the maxUncommittedOffsets limit
+ KafkaSpoutMessageId failedMessageIdPartitionOne = null;
+ for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+ if (msgId.partition() == partition.partition()) {
+ failedMessageIdPartitionOne = msgId;
+ break;
+ }
+ }
+
+ spout.fail(failedMessageIdPartitionOne);
+
+ //Also fail the last tuple from partition two. Since the failed tuple is beyond the maxUncommittedOffsets limit, it should not be retried until earlier messages are acked.
+ KafkaSpoutMessageId failedMessageIdPartitionTwo = null;
+ for (KafkaSpoutMessageId msgId: messageIds.getAllValues()) {
+ if (msgId.partition() == partitionTwo.partition()) {
+ if (failedMessageIdPartitionTwo != null) {
+ if (msgId.offset() >= failedMessageIdPartitionTwo.offset()) {
+ failedMessageIdPartitionTwo = msgId;
+ }
+ } else {
+ failedMessageIdPartitionTwo = msgId;
+ }
+ }
+ }
+
+ spout.fail(failedMessageIdPartitionTwo);
+
+ reset(collectorMock);
+
+ Time.advanceTime(50);
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, failedMessageIdPartitionOne.offset(), 1))));
+
+ spout.nextTuple();
+
+ verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+
+ InOrder inOrder = inOrder(consumerMock);
+ inOrder.verify(consumerMock).seek(partition, failedMessageIdPartitionOne.offset());
+ //Should not seek on the paused partition
+ inOrder.verify(consumerMock, never()).seek(eq(partitionTwo), anyLong());
+ inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo));
+ inOrder.verify(consumerMock).poll(anyLong());
+ inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo));
+
+ reset(collectorMock);
+
+ //Now also check that no more tuples are polled for, since both partitions are at their limits
+ spout.nextTuple();
+
+ verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
new file mode 100644
index 0000000..09f7fc5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+import static org.hamcrest.CoreMatchers.is;
+
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+
+public class KafkaSpoutLogCompactionSupportTest {
+
+ private final long offsetCommitPeriodMs = 2_000;
+ private final TopologyContext contextMock = mock(TopologyContext.class);
+ private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaSpoutConfig<String, String> spoutConfig;
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .build();
+ consumerMock = mock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testCommitSuccessWithOffsetVoids() {
+ //Verify that the commit logic can handle offset voids due to log compaction
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+ // Offsets emitted are 0,1,2,3,4,<void>,8,9
+ recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 5));
+ recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 8, 2));
+ records.put(partition, recordsForPartition);
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(records));
+
+ for (int i = 0; i < recordsForPartition.size(); i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+
+ for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+ spout.ack(messageId);
+ }
+
+ // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<String, String>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
+ spout.nextTuple();
+
+ InOrder inOrder = inOrder(consumerMock);
+ inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+ inOrder.verify(consumerMock).poll(anyLong());
+
+ //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at
+ Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+ assertTrue(commits.containsKey(partition));
+ assertEquals(10, commits.get(partition).offset());
+ }
+ }
+
+ @Test
+ public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() {
+ /*
+ Verify that failed offsets will only retry if the corresponding message exists.
+ When log compaction is enabled in Kafka it is possible that a tuple can fail,
+ and then be impossible to retry because the message in Kafka has been deleted.
+ The spout needs to quietly ack such tuples to allow commits to progress past the deleted offset.
+ */
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo);
+
+ List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+ .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2);
+ reset(collectorMock);
+ List<KafkaSpoutMessageId> secondPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+ .pollAndEmit(spout, consumerMock, 3, collectorMock, partitionTwo, 0, 1, 2);
+ reset(collectorMock);
+
+ for(int i = 0; i < 3; i++) {
+ spout.fail(firstPartitionMsgIds.get(i));
+ spout.fail(secondPartitionMsgIds.get(i));
+ }
+
+ Time.advanceTime(50);
+
+ //The failed tuples are ready for retry. Make it appear like 0 and 1 on the first partition were compacted away.
+ //In this case the second partition acts as control to verify that we only skip past offsets that are no longer present.
+ Map<TopicPartition, int[]> retryOffsets = new HashMap<>();
+ retryOffsets.put(partition, new int[] {2});
+ retryOffsets.put(partitionTwo, new int[] {0, 1, 2});
+ int expectedEmits = 4; //2 on first partition, 0-2 on second partition
+ List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, retryOffsets);
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ verify(consumerMock).commitSync(commitCapture.capture());
+ Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+ assertThat(committed.keySet(), is(Collections.singleton(partition)));
+ assertThat("The first partition should have committed up to the first retriable tuple that is not missing", committed.get(partition).offset(), is(2L));
+
+ for(KafkaSpoutMessageId msgId : retryMessageIds) {
+ spout.ack(msgId);
+ }
+
+ //The spout should now commit all the offsets, since all offsets are either acked or were missing when retrying
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+ committed = commitCapture.getValue();
+ assertThat(committed, hasKey(partition));
+ assertThat(committed, hasKey(partitionTwo));
+ assertThat(committed.get(partition).offset(), is(3L));
+ assertThat(committed.get(partitionTwo).offset(), is(3L));
+ }
+ }
+
+ @Test
+ public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples() {
+ //Demonstrate that the spout doesn't ack pending tuples when skipping compacted tuples. The pending tuples should be allowed to finish normally.
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+ .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2);
+ reset(collectorMock);
+
+ spout.fail(firstPartitionMsgIds.get(0));
+ spout.fail(firstPartitionMsgIds.get(2));
+
+ Time.advanceTime(50);
+
+ //The failed tuples are ready for retry. Make it appear like 0 and 1 were compacted away.
+ List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2);
+ for(KafkaSpoutMessageId msgId : retryMessageIds) {
+ spout.ack(msgId);
+ }
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ verify(consumerMock).commitSync(commitCapture.capture());
+ Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+ assertThat(committed.keySet(), is(Collections.singleton(partition)));
+ assertThat("The first partition should have committed the missing offset, but no further since the next tuple is pending",
+ committed.get(partition).offset(), is(1L));
+
+ spout.ack(firstPartitionMsgIds.get(1));
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+ committed = commitCapture.getValue();
+ assertThat(committed.keySet(), is(Collections.singleton(partition)));
+ assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L));
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
new file mode 100644
index 0000000..082cc58
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutMessagingGuaranteeTest {
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private final TopologyContext contextMock = mock(TopologyContext.class);
+ private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+ private KafkaConsumer<String, String> consumerMock;
+
+ @Before
+ public void setUp() {
+ consumerMock = mock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testAtMostOnceModeCommitsBeforeEmit() throws Exception {
+ //At-most-once mode must commit tuples before they are emitted to the topology to ensure that a spout crash won't cause replays.
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .build();
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+
+ when(consumerMock.position(partition)).thenReturn(1L);
+
+ //The spout should have emitted the tuple, and must have committed it before emit
+ InOrder inOrder = inOrder(consumerMock, collectorMock);
+ inOrder.verify(consumerMock).poll(anyLong());
+ inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+ inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+
+ CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+ assertThat(committedOffsets.get(partition).offset(), is(0L));
+ assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
+ }
+
+ private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()))))
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, spoutConfig.getMaxUncommittedOffsets() - 1, spoutConfig.getMaxUncommittedOffsets()))));
+
+ for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() * 2; i++) {
+ spout.nextTuple();
+ }
+
+ verify(consumerMock, times(2)).poll(anyLong());
+ verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+ }
+
+ @Test
+ public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception {
+ //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .build();
+ doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+ }
+
+ @Test
+ public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception {
+ //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+ .build();
+ doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+ }
+
+ private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+
+ ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+ assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+ spout.fail(msgIdCaptor.getValue());
+
+ reset(consumerMock);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 1, 1))));
+
+ spout.nextTuple();
+
+ //The consumer should not be seeking to retry the failed tuple, it should just be continuing from the current position
+ verify(consumerMock, never()).seek(eq(partition), anyLong());
+ }
+
+ @Test
+ public void testAtMostOnceModeCannotReplayTuples() throws Exception {
+ //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .setTupleTrackingEnforced(true)
+ .build();
+ doTestModeCannotReplayTuples(spoutConfig);
+ }
+
+ @Test
+ public void testNoGuaranteeModeCannotReplayTuples() throws Exception {
+ //When tuple tracking is enabled, the spout must not replay tuples in no guarantee mode
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+ .setTupleTrackingEnforced(true)
+ .build();
+ doTestModeCannotReplayTuples(spoutConfig);
+ }
+
+ @Test
+ public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+ //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+ .setTupleTrackingEnforced(true)
+ .build();
+ try (SimulatedTime time = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+ reset(consumerMock);
+
+ ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+ assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+ spout.ack(msgIdCaptor.getValue());
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
+
+ spout.nextTuple();
+
+ verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>>() {
+ @Override
+ public boolean matches(Object arg) {
+ Map<TopicPartition, OffsetAndMetadata> castArg = (Map<TopicPartition, OffsetAndMetadata>) arg;
+ return !castArg.containsKey(partition);
+ }
+ }));
+ }
+ }
+
+ @Test
+ public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
+ //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked
+ KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+ .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+ .setTupleTrackingEnforced(true)
+ .build();
+
+ try (SimulatedTime time = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+ SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+ spout.nextTuple();
+
+ when(consumerMock.position(partition)).thenReturn(1L);
+
+ ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+ assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+
+ spout.nextTuple();
+
+ verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
+
+ CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+ assertThat(committedOffsets.get(partition).offset(), is(1L));
+ assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
new file mode 100644
index 0000000..c2c46b5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutReactivationTest {
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private final TopologyContext topologyContext = mock(TopologyContext.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+ private final long commitOffsetPeriodMs = 2_000;
+ private KafkaConsumer<String, String> consumerSpy;
+ private KafkaConsumer<String, String> postReactivationConsumerSpy;
+ private KafkaSpout<String, String> spout;
+ private final int maxPollRecords = 10;
+
+ @Before
+ public void setUp() {
+ KafkaSpoutConfig<String, String> spoutConfig =
+ SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+ KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+ SingleTopicKafkaSpoutConfiguration.TOPIC))
+ .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+ .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+ .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
+ .build();
+ KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactoryDefault<>();
+ this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+ this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+ KafkaConsumerFactory<String, String> consumerFactoryMock = mock(KafkaConsumerFactory.class);
+ when(consumerFactoryMock.createConsumer(any(KafkaSpoutConfig.class)))
+ .thenReturn(consumerSpy)
+ .thenReturn(postReactivationConsumerSpy);
+ this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock);
+ }
+
+ private void prepareSpout(int messageCount) throws Exception {
+ SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
+ SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
+ }
+
+ private KafkaSpoutMessageId emitOne() {
+ ArgumentCaptor<KafkaSpoutMessageId> messageId = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ spout.nextTuple();
+ verify(collector).emit(anyString(), anyList(), messageId.capture());
+ reset(collector);
+ return messageId.getValue();
+ }
+
+ @Test
+ public void testSpoutMustHandleReactivationGracefully() throws Exception {
+ try (Time.SimulatedTime time = new Time.SimulatedTime()) {
+ int messageCount = maxPollRecords * 2;
+ prepareSpout(messageCount);
+
+ //Emit and ack some tuples, ensure that some polled tuples remain cached in the spout by emitting less than maxPollRecords
+ int beforeReactivationEmits = maxPollRecords - 3;
+ for (int i = 0; i < beforeReactivationEmits - 1; i++) {
+ KafkaSpoutMessageId msgId = emitOne();
+ spout.ack(msgId);
+ }
+
+ KafkaSpoutMessageId ackAfterDeactivateMessageId = emitOne();
+
+ //Cycle spout activation
+ spout.deactivate();
+ SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, beforeReactivationEmits - 1);
+ //Tuples may be acked/failed after the spout deactivates, so we have to be able to handle this too
+ spout.ack(ackAfterDeactivateMessageId);
+ spout.activate();
+
+ //Emit and ack the rest
+ for (int i = beforeReactivationEmits; i < messageCount; i++) {
+ KafkaSpoutMessageId msgId = emitOne();
+ spout.ack(msgId);
+ }
+
+ //Commit
+ Time.advanceTime(TIMER_DELAY_MS + commitOffsetPeriodMs);
+ spout.nextTuple();
+
+ //Verify that no more tuples are emitted and all tuples are committed
+ SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(postReactivationConsumerSpy, commitCapture, messageCount);
+
+ reset(collector);
+ spout.nextTuple();
+ verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index a554a3b..29d2a22 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,11 +15,26 @@
*/
package org.apache.storm.kafka.spout;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -27,45 +42,31 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
-
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
import org.mockito.Captor;
-
-import static org.mockito.Mockito.reset;
-
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.Matchers.hasKey;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
public class KafkaSpoutRebalanceTest {
@Captor
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+ private final long offsetCommitPeriodMs = 2_000;
+ private final Map<String, Object> conf = new HashMap<>();
private TopologyContext contextMock;
private SpoutOutputCollector collectorMock;
- private Map conf;
private KafkaConsumer<String, String> consumerMock;
private KafkaConsumerFactory<String, String> consumerFactoryMock;
@@ -74,7 +75,6 @@ public class KafkaSpoutRebalanceTest {
MockitoAnnotations.initMocks(this);
contextMock = mock(TopologyContext.class);
collectorMock = mock(SpoutOutputCollector.class);
- conf = new HashMap<>();
consumerMock = mock(KafkaConsumer.class);
consumerFactoryMock = new KafkaConsumerFactory<String, String>(){
@Override
@@ -85,44 +85,39 @@ public class KafkaSpoutRebalanceTest {
}
//Returns messageIds in order of emission
- private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
- //Setup spout with mock consumer so we can get at the rebalance listener
+ private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) {
+ //Setup spout with mock consumer so we can get at the rebalance listener
spout.open(conf, contextMock, collectorMock);
spout.activate();
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyList(), rebalanceListenerCapture.capture());
-
//Assign partitions to the spout
ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
- List<TopicPartition> assignedPartitions = new ArrayList<>();
+ Set<TopicPartition> assignedPartitions = new HashSet<>();
assignedPartitions.add(partitionThatWillBeRevoked);
assignedPartitions.add(assignedPartition);
consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+ when(consumerMock.assignment()).thenReturn(assignedPartitions);
//Make the consumer return a single message for each partition
- Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
- firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord<>(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
- Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
- secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(firstPartitionRecords))
- .thenReturn(new ConsumerRecords(secondPartitionRecords))
- .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionThatWillBeRevoked, 0, 1))))
+ .thenReturn(new ConsumerRecords<>(Collections.singletonMap(assignedPartition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(assignedPartition, 0, 1))))
+ .thenReturn(new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>()));
//Emit the messages
spout.nextTuple();
ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock).emit(anyString(), anyList(), messageIdForRevokedPartition.capture());
+ verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForRevokedPartition.capture());
reset(collectorMock);
spout.nextTuple();
ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock).emit(anyString(), anyList(), messageIdForAssignedPartition.capture());
+ verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForAssignedPartition.capture());
//Now rebalance
consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
-
+ when(consumerMock.assignment()).thenReturn(Collections.singleton(assignedPartition));
+
List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
emittedMessageIds.add(messageIdForRevokedPartition.getValue());
emittedMessageIds.add(messageIdForAssignedPartition.getValue());
@@ -132,49 +127,122 @@ public class KafkaSpoutRebalanceTest {
@Test
public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
- String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
- TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
- TopicPartition assignedPartition = new TopicPartition(topic, 2);
-
- //Emit a message on each partition and revoke the first partition
- List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
-
- //Ack both emitted tuples
- spout.ack(emittedMessageIds.get(0));
- spout.ack(emittedMessageIds.get(1));
-
- //Ensure the commit timer has expired
- Thread.sleep(510);
-
- //Make the spout commit any acked tuples
- spout.nextTuple();
- //Verify that it only committed the message on the assigned partition
- verify(consumerMock).commitSync(commitCapture.capture());
-
- Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
- assertThat(commitCaptureMap, hasKey(assignedPartition));
- assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ Subscription subscriptionMock = mock(Subscription.class);
+ doNothing()
+ .when(subscriptionMock)
+ .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
+ KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .build(), consumerFactoryMock);
+ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+ spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
+
+ //Ack both emitted tuples
+ spout.ack(emittedMessageIds.get(0));
+ spout.ack(emittedMessageIds.get(1));
+
+ //Ensure the commit timer has expired
+ Time.advanceTime(offsetCommitPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+ //Make the spout commit any acked tuples
+ spout.nextTuple();
+ //Verify that it only committed the message on the assigned partition
+ verify(consumerMock, times(1)).commitSync(commitCapture.capture());
+
+ Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+ assertThat(commitCaptureMap, hasKey(assignedPartition));
+ assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+ }
}
-
+
@Test
public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ Subscription subscriptionMock = mock(Subscription.class);
+ doNothing()
+ .when(subscriptionMock)
+ .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+ .setOffsetCommitPeriodMs(10)
+ .setRetry(retryServiceMock)
+ .build(), consumerFactoryMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);
-
+
+ when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class)))
+ .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0))
+ .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
+
//Emit a message on each partition and revoke the first partition
- List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+ spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
+
+ //Check that only two message ids were generated
+ verify(retryServiceMock, times(2)).getMessageId(Mockito.any(ConsumerRecord.class));
//Fail both emitted tuples
spout.fail(emittedMessageIds.get(0));
spout.fail(emittedMessageIds.get(1));
-
+
//Check that only the tuple on the currently assigned partition is retried
verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
verify(retryServiceMock).schedule(emittedMessageIds.get(1));
}
+
+ @Test
+ public void testReassignPartitionSeeksForOnlyNewPartitions() {
+ /*
+ * When partitions are reassigned, the spout should seek with the first poll offset strategy for new partitions.
+ * Previously assigned partitions should be left alone, since the spout keeps the emitted and acked state for those.
+ */
+
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ Subscription subscriptionMock = mock(Subscription.class);
+ doNothing()
+ .when(subscriptionMock)
+ .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
+ KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+ .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
+ .build(), consumerFactoryMock);
+ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition assignedPartition = new TopicPartition(topic, 1);
+ TopicPartition newPartition = new TopicPartition(topic, 2);
+
+ //Setup spout with mock consumer so we can get at the rebalance listener
+ spout.open(conf, contextMock, collectorMock);
+ spout.activate();
+
+ //Assign partitions to the spout
+ ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+ Set<TopicPartition> assignedPartitions = new HashSet<>();
+ assignedPartitions.add(assignedPartition);
+ consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+ reset(consumerMock);
+
+ //Set up committed so it looks like some messages have been committed on each partition
+ long committedOffset = 500;
+ when(consumerMock.committed(assignedPartition)).thenReturn(new OffsetAndMetadata(committedOffset));
+ when(consumerMock.committed(newPartition)).thenReturn(new OffsetAndMetadata(committedOffset));
+
+ //Now rebalance and add a new partition
+ consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+ Set<TopicPartition> newAssignedPartitions = new HashSet<>();
+ newAssignedPartitions.add(assignedPartition);
+ newAssignedPartitions.add(newPartition);
+ consumerRebalanceListener.onPartitionsAssigned(newAssignedPartitions);
+
+ //This partition was previously assigned, so the consumer position shouldn't change
+ verify(consumerMock, never()).seek(eq(assignedPartition), anyLong());
+ //This partition is new, and should start at the committed offset
+ verify(consumerMock).seek(newPartition, committedOffset);
+ }
}