You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:04 UTC

[03/14] storm git commit: STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: copied external/storm-kafka-client from 1.x-branch

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
new file mode 100644
index 0000000..681953d
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.junit.Test;
+
+public class DefaultRecordTranslatorTest {
+    @Test
+    public void testBasic() {
+        DefaultRecordTranslator<String, String> trans = new DefaultRecordTranslator<>();
+        assertEquals(Arrays.asList("default"), trans.streams());
+        assertEquals(new Fields("topic", "partition", "offset", "key", "value"), trans.getFieldsFor("default"));
+        ConsumerRecord<String, String> cr = new ConsumerRecord<>("TOPIC", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(Arrays.asList("TOPIC", 100, 100l, "THE KEY", "THE VALUE"), trans.apply(cr));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
new file mode 100644
index 0000000..0467383
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *  
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public abstract class KafkaSpoutAbstractTest {
+    @Rule
+    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+    final TopologyContext topologyContext = mock(TopologyContext.class);
+    final Map<String, Object> conf = new HashMap<>();
+    final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+    final long commitOffsetPeriodMs;
+
+    KafkaConsumer<String, String> consumerSpy;
+    KafkaSpout<String, String> spout;
+
+    @Captor
+    ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+    private Time.SimulatedTime simulatedTime;
+    private KafkaSpoutConfig<String, String> spoutConfig;
+
+    /**
+     * This constructor should be called by the subclass' default constructor with the desired value
+     * @param commitOffsetPeriodMs commit offset period to be used in commit and verification of messages committed
+     */
+    protected KafkaSpoutAbstractTest(long commitOffsetPeriodMs) {
+        this.commitOffsetPeriodMs = commitOffsetPeriodMs;
+    }
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        spoutConfig = createSpoutConfig();
+
+        consumerSpy = createConsumerSpy();
+
+        spout = new KafkaSpout<>(spoutConfig, createConsumerFactory());
+
+        simulatedTime = new Time.SimulatedTime();
+    }
+
+    private KafkaConsumerFactory<String, String> createConsumerFactory() {
+
+        return new KafkaConsumerFactory<String, String>() {
+            @Override
+            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+                return consumerSpy;
+            }
+
+        };
+    }
+
+    KafkaConsumer<String, String> createConsumerSpy() {
+        return spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig));
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        simulatedTime.close();
+    }
+
+    abstract KafkaSpoutConfig<String, String> createSpoutConfig();
+
+    void prepareSpout(int messageCount) throws Exception {
+        SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collectorMock);
+    }
+
+    /**
+     * Helper method to in sequence do:
+     * <li>
+     *     <ul>spout.nexTuple()</ul>
+     *     <ul>verify messageId</ul>
+     *     <ul>spout.ack(msgId)</ul>
+     *     <ul>reset(collector) to be able to reuse mock</ul>
+     * </li>
+     *
+     * @param offset offset of message to be verified
+     * @return {@link ArgumentCaptor} of the messageId verified
+     */
+    ArgumentCaptor<Object> nextTuple_verifyEmitted_ack_resetCollector(int offset) {
+        spout.nextTuple();
+
+        ArgumentCaptor<Object> messageId = verifyMessageEmitted(offset);
+
+        spout.ack(messageId.getValue());
+
+        reset(collectorMock);
+
+        return messageId;
+    }
+
+    // offset and messageId are used interchangeably
+    ArgumentCaptor<Object> verifyMessageEmitted(int offset) {
+        final ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
+
+        verify(collectorMock).emit(
+            eq(SingleTopicKafkaSpoutConfiguration.STREAM),
+            eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
+                Integer.toString(offset),
+                Integer.toString(offset))),
+            messageId.capture());
+
+        return messageId;
+    }
+
+    void commitAndVerifyAllMessagesCommitted(long msgCount) {
+        // reset commit timer such that commit happens on next call to nextTuple()
+        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+
+        //Commit offsets
+        spout.nextTuple();
+
+        verifyAllMessagesCommitted(msgCount);
+    }
+
+    /*
+     * Asserts that commitSync has been called once,
+     * that there are only commits on one topic,
+     * and that the committed offset covers messageCount messages
+     */
+    void verifyAllMessagesCommitted(long messageCount) {
+        verify(consumerSpy).commitSync(commitCapture.capture());
+
+        final Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+        assertThat("Expected commits for only one topic partition", commits.entrySet().size(), is(1));
+
+        OffsetAndMetadata offset = commits.entrySet().iterator().next().getValue();
+        assertThat("Expected committed offset to cover all emitted messages", offset.offset(), is(messageCount));
+
+        reset(consumerSpy);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
new file mode 100644
index 0000000..90e906b
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.hamcrest.CoreMatchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class KafkaSpoutConfigTest {
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    
+    @Test
+    public void testBasic() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic").build();
+        assertEquals(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, conf.getFirstPollOffsetStrategy());
+        assertNull(conf.getConsumerGroupId());
+        assertTrue(conf.getTranslator() instanceof DefaultRecordTranslator);
+        HashMap<String, Object> expected = new HashMap<>();
+        expected.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1234");
+        expected.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        expected.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        assertEquals(expected, conf.getKafkaProps());
+        assertEquals(KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS, conf.getMetricsTimeBucketSizeInSecs());
+    }
+
+    @Test
+    public void testSetEmitNullTuplesToTrue() {
+        final KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+                .setEmitNullTuples(true)
+                .build();
+
+        assertTrue("Failed to set emit null tuples to true", conf.isEmitNullTuples());
+    }
+    
+    @Test
+    public void testShouldNotChangeAutoOffsetResetPolicyWhenNotUsingAtLeastOnce() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        
+        assertThat("When at-least-once is not specified, the spout should use the Kafka default auto offset reset policy",
+            conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), nullValue());
+    }
+    
+    @Test
+    public void testWillRespectExplicitAutoOffsetResetPolicy() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+            .build();
+        
+        assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee",
+            (String)conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none"));
+    }
+    
+    @Test
+    public void testCanConfigureWithExplicitTrueBooleanAutoCommitMode() {
+        /*
+         * Since adding setProcessingGuarantee to KafkaSpoutConfig we don't want users to set "enable.auto.commit" in the consumer config,
+         * because setting the processing guarantee will do it automatically. For backward compatibility we need to be able to handle the 
+         * property being set anyway for a few releases, and try to set a processing guarantee that corresponds to the property.
+         */
+        
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
+            .build();
+        
+        assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee",
+            conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE));
+    }
+    
+    @Test
+    public void testCanConfigureWithExplicitFalseBooleanAutoCommitMode() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
+            .build();
+        
+        assertThat("When setting enable auto commit to false explicitly the spout should use the 'at-least-once' processing guarantee",
+            conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE));
+    }
+    
+    @Test
+    public void testCanConfigureWithExplicitTrueStringAutoCommitMode() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
+            .build();
+        
+        assertThat("When setting enable auto commit to true explicitly the spout should use the 'none' processing guarantee",
+            conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE));
+    }
+    
+    @Test
+    public void testCanConfigureWithExplicitFalseStringAutoCommitMode() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+            .build();
+        
+        assertThat("When setting enable auto commit explicitly to false the spout should use the 'at-least-once' processing guarantee",
+            conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE));
+    }
+    
+    @Test
+    public void testCanGetKeyDeserializerWhenUsingDefaultBuilder() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .build();
+        
+        assertThat("When using the default builder methods, the key deserializer should default to StringDeserializer",
+            conf.getKeyDeserializer(), instanceOf(StringDeserializer.class));
+    }
+    
+    @Test
+    public void testCanGetValueDeserializerWhenUsingDefaultBuilder() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .build();
+        
+        assertThat("When using the default builder methods, the value deserializer should default to StringDeserializer",
+            conf.getValueDeserializer(), instanceOf(StringDeserializer.class));
+    }
+    
+    @Test
+    public void testCanOverrideDeprecatedDeserializerClassWithKafkaProps() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setKey(StringDeserializer.class)
+            .setValue(StringDeserializer.class)
+            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+            .build();
+        
+        assertThat("The last set key deserializer should be used, regardless of how it is set",
+            conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+        assertThat("The last set value deserializer should be used, regardless of how it is set",
+            conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+    }
+    
+    private static class SerializableStringDeserializer implements SerializableDeserializer {
+
+        private final StringDeserializer delegate = new StringDeserializer();
+
+        @Override
+        public void configure(Map configs, boolean isKey) {
+            delegate.configure(configs, isKey);
+        }
+
+        @Override
+        public Object deserialize(String topic, byte[] data) {
+            return delegate.deserialize(topic, data);
+        }
+
+        @Override
+        public void close() {
+            delegate.close();
+        }
+    }
+    
+    @Test
+    public void testCanOverrideDeprecatedDeserializerInstanceWithKafkaProps() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setKey(new SerializableStringDeserializer())
+            .setValue(new SerializableStringDeserializer())
+            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+            .build();
+        
+        assertThat("The last set key deserializer should be used, regardless of how it is set",
+            conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+        assertThat("The last set value deserializer should be used, regardless of how it is set",
+            conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+    }
+    
+    @Test
+    public void testCanOverrideKafkaPropsWithDeprecatedDeserializerSetter() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+            .setKey(new SerializableStringDeserializer())
+            .setValue(new SerializableStringDeserializer())
+            .build();
+        
+        assertThat("The last set key deserializer should be used, regardless of how it is set",
+            conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
+        assertThat("The last set value deserializer should be used, regardless of how it is set",
+            conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
+    }
+    
+    @Test
+    public void testCanMixOldAndNewDeserializerSetter() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+            .setKey(new SerializableStringDeserializer())
+            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
+            .setValue(new SerializableStringDeserializer())
+            .build();
+        
+        assertThat("The last set key deserializer should be used, regardless of how it is set",
+            conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
+        assertThat("The last set value deserializer should be used, regardless of how it is set",
+            conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
+    }
+
+    @Test
+    public void testMetricsTimeBucketSizeInSecs() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+             .setMetricsTimeBucketSizeInSecs(100)
+            .build();
+
+        assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
new file mode 100755
index 0000000..dbba04b
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.never;
+
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.mockito.InOrder;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.eq;
+
+public class KafkaSpoutEmitTest {
+
+    private final long offsetCommitPeriodMs = 2_000;
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+    private KafkaSpoutConfig<String, String> spoutConfig;
+
+    @Before
+    public void setUp() {
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .build();
+        consumerMock = mock(KafkaConsumer.class);
+    }
+
+    @Test
+    public void testNextTupleEmitsAtMostOneTuple() {
+        //The spout should emit at most one message per call to nextTuple
+        //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+        records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 10));
+
+        when(consumerMock.poll(anyLong()))
+            .thenReturn(new ConsumerRecords<>(records));
+
+        spout.nextTuple();
+
+        verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+    }
+
+    @Test
+    public void testNextTupleEmitsFailedMessagesEvenWhenMaxUncommittedOffsetsIsExceeded() throws IOException {
+        //The spout must reemit failed messages waiting for retry even if it is not allowed to poll for new messages due to maxUncommittedOffsets being exceeded
+
+        //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+            int numRecords = spoutConfig.getMaxUncommittedOffsets();
+            //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+            records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, numRecords));
+
+            when(consumerMock.poll(anyLong()))
+                .thenReturn(new ConsumerRecords<>(records));
+
+            for (int i = 0; i < numRecords; i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), messageIds.capture());
+
+            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+                spout.fail(messageId);
+            }
+
+            reset(collectorMock);
+
+            Time.advanceTime(50);
+            //No backoff for test retry service, just check that messages will retry immediately
+            for (int i = 0; i < numRecords; i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> retryMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(numRecords)).emit(anyString(), anyList(), retryMessageIds.capture());
+
+            //Verify that the poll started at the earliest retriable tuple offset
+            List<Long> failedOffsets = new ArrayList<>();
+            for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+                failedOffsets.add(msgId.offset());
+            }
+            InOrder inOrder = inOrder(consumerMock);
+            inOrder.verify(consumerMock).seek(partition, failedOffsets.get(0));
+            inOrder.verify(consumerMock).poll(anyLong());
+        }
+    }
+
+    @Test
+    public void testSpoutWillSkipPartitionsAtTheMaxUncommittedOffsetsLimit() {
+        //This verifies that partitions can't prevent each other from retrying tuples due to the maxUncommittedOffsets limit.
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo);
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+            //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
+            records.put(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()));
+            records.put(partitionTwo, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionTwo, 0, spoutConfig.getMaxUncommittedOffsets() + 1));
+            int numMessages = spoutConfig.getMaxUncommittedOffsets()*2 + 1;
+
+            when(consumerMock.poll(anyLong()))
+                .thenReturn(new ConsumerRecords<>(records));
+
+            for (int i = 0; i < numMessages; i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(numMessages)).emit(anyString(), anyList(), messageIds.capture());
+            
+            //Now fail a tuple on partition one and verify that it is allowed to retry, because the failed tuple is below the maxUncommittedOffsets limit
+            KafkaSpoutMessageId failedMessageIdPartitionOne = null;
+            for (KafkaSpoutMessageId msgId : messageIds.getAllValues()) {
+                if (msgId.partition() == partition.partition()) {
+                    failedMessageIdPartitionOne = msgId;
+                    break;
+                }
+            }
+            
+            spout.fail(failedMessageIdPartitionOne);
+
+            //Also fail the last tuple from partition two. Since the failed tuple is beyond the maxUncommittedOffsets limit, it should not be retried until earlier messages are acked.
+            KafkaSpoutMessageId failedMessageIdPartitionTwo = null;
+            for (KafkaSpoutMessageId msgId: messageIds.getAllValues()) {
+                if (msgId.partition() == partitionTwo.partition()) {
+                    if (failedMessageIdPartitionTwo != null) {
+                        if (msgId.offset() >= failedMessageIdPartitionTwo.offset()) {
+                            failedMessageIdPartitionTwo = msgId;
+                        }
+                    } else {
+                        failedMessageIdPartitionTwo = msgId;
+                    }
+                }
+            }
+
+            spout.fail(failedMessageIdPartitionTwo);
+            
+            reset(collectorMock);
+            
+            Time.advanceTime(50);
+            when(consumerMock.poll(anyLong()))
+                .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, failedMessageIdPartitionOne.offset(), 1))));
+            
+            spout.nextTuple();
+            
+            verify(collectorMock, times(1)).emit(anyString(), anyList(), anyObject());
+            
+            InOrder inOrder = inOrder(consumerMock);
+            inOrder.verify(consumerMock).seek(partition, failedMessageIdPartitionOne.offset());
+            //Should not seek on the paused partition
+            inOrder.verify(consumerMock, never()).seek(eq(partitionTwo), anyLong());
+            inOrder.verify(consumerMock).pause(Collections.singleton(partitionTwo));
+            inOrder.verify(consumerMock).poll(anyLong());
+            inOrder.verify(consumerMock).resume(Collections.singleton(partitionTwo));
+            
+            reset(collectorMock);
+            
+            //Now also check that no more tuples are polled for, since both partitions are at their limits
+            spout.nextTuple();
+
+            verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
new file mode 100644
index 0000000..09f7fc5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+import static org.hamcrest.CoreMatchers.is;
+
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+
+public class KafkaSpoutLogCompactionSupportTest {
+
+    private final long offsetCommitPeriodMs = 2_000;
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+    private KafkaSpoutConfig<String, String> spoutConfig;
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .build();
+        consumerMock = mock(KafkaConsumer.class);
+    }
+
+    @Test
+    public void testCommitSuccessWithOffsetVoids() {
+        //Verify that the commit logic can handle offset voids due to log compaction
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+            List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+            // Offsets emitted are 0,1,2,3,4,<void>,8,9
+            recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 5));
+            recordsForPartition.addAll(SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 8, 2));
+            records.put(partition, recordsForPartition);
+
+            when(consumerMock.poll(anyLong()))
+                    .thenReturn(new ConsumerRecords<>(records));
+
+            for (int i = 0; i < recordsForPartition.size(); i++) {
+                spout.nextTuple();
+            }
+
+            ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+
+            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+                spout.ack(messageId);
+            }
+
+            // Advance time and then trigger first call to kafka consumer commit; the commit must progress to offset 9
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            when(consumerMock.poll(anyLong()))
+                    .thenReturn(new ConsumerRecords<String, String>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
+            spout.nextTuple();
+
+            InOrder inOrder = inOrder(consumerMock);
+            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+            inOrder.verify(consumerMock).poll(anyLong());
+
+            //verify that Offset 10 was last committed offset, since this is the offset the spout should resume at
+            Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+            assertTrue(commits.containsKey(partition));
+            assertEquals(10, commits.get(partition).offset());
+        }
+    }
+    
+    @Test
+    public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAway() {
+        /*
+          Verify that failed offsets will only retry if the corresponding message exists. 
+          When log compaction is enabled in Kafka it is possible that a tuple can fail, 
+          and then be impossible to retry because the message in Kafka has been deleted.
+          The spout needs to quietly ack such tuples to allow commits to progress past the deleted offset.
+         */
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            TopicPartition partitionTwo = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 2);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition, partitionTwo);
+            
+            List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2);
+            reset(collectorMock);
+            List<KafkaSpoutMessageId> secondPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 3, collectorMock, partitionTwo, 0, 1, 2);
+            reset(collectorMock);
+            
+            for(int i = 0; i < 3; i++) {
+                spout.fail(firstPartitionMsgIds.get(i));
+                spout.fail(secondPartitionMsgIds.get(i));
+            }
+            
+            Time.advanceTime(50);
+            
+            //The failed tuples are ready for retry. Make it appear like 0 and 1 on the first partition were compacted away.
+            //In this case the second partition acts as control to verify that we only skip past offsets that are no longer present.
+            Map<TopicPartition, int[]> retryOffsets = new HashMap<>();
+            retryOffsets.put(partition, new int[] {2});
+            retryOffsets.put(partitionTwo, new int[] {0, 1, 2});
+            int expectedEmits = 4; //2 on first partition, 0-2 on second partition
+            List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, expectedEmits, collectorMock, retryOffsets);
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock).commitSync(commitCapture.capture());
+            Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+            assertThat(committed.keySet(), is(Collections.singleton(partition)));
+            assertThat("The first partition should have committed up to the first retriable tuple that is not missing", committed.get(partition).offset(), is(2L));
+            
+            for(KafkaSpoutMessageId msgId : retryMessageIds) {
+                spout.ack(msgId);
+            }
+            
+            //The spout should now commit all the offsets, since all offsets are either acked or were missing when retrying
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+
+            verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+            committed = commitCapture.getValue();
+            assertThat(committed, hasKey(partition));
+            assertThat(committed, hasKey(partitionTwo));
+            assertThat(committed.get(partition).offset(), is(3L));
+            assertThat(committed.get(partitionTwo).offset(), is(3L));
+        }
+    }
+    
+    @Test
+    public void testWillSkipRetriableTuplesIfOffsetsAreCompactedAwayWithoutAckingPendingTuples() {
+        //Demonstrate that the spout doesn't ack pending tuples when skipping compacted tuples. The pending tuples should be allowed to finish normally.
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            
+            List<KafkaSpoutMessageId> firstPartitionMsgIds = SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 3, collectorMock, partition, 0, 1, 2);
+            reset(collectorMock);
+            
+            spout.fail(firstPartitionMsgIds.get(0));            
+            spout.fail(firstPartitionMsgIds.get(2));
+            
+            Time.advanceTime(50);
+            
+            //The failed tuples are ready for retry. Make it appear like 0 and 1 were compacted away.
+            List<KafkaSpoutMessageId> retryMessageIds = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2);
+            for(KafkaSpoutMessageId msgId : retryMessageIds) {
+                spout.ack(msgId);
+            }
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock).commitSync(commitCapture.capture());
+            Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+            assertThat(committed.keySet(), is(Collections.singleton(partition)));
+            assertThat("The first partition should have committed the missing offset, but no further since the next tuple is pending",
+                committed.get(partition).offset(), is(1L));
+            
+            spout.ack(firstPartitionMsgIds.get(1));
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock, times(2)).commitSync(commitCapture.capture());
+            committed = commitCapture.getValue();
+            assertThat(committed.keySet(), is(Collections.singleton(partition)));
+            assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L));
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
new file mode 100644
index 0000000..082cc58
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutMessagingGuaranteeTest {
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+    private final TopologyContext contextMock = mock(TopologyContext.class);
+    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+    private KafkaConsumer<String, String> consumerMock;
+
+    @Before
+    public void setUp() {
+        consumerMock = mock(KafkaConsumer.class);
+    }
+
+    @Test
+    public void testAtMostOnceModeCommitsBeforeEmit() throws Exception {
+        //At-most-once mode must commit tuples before they are emitted to the topology to ensure that a spout crash won't cause replays.
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+
+        when(consumerMock.position(partition)).thenReturn(1L);
+
+        //The spout should have emitted the tuple, and must have committed it before emit
+        InOrder inOrder = inOrder(consumerMock, collectorMock);
+        inOrder.verify(consumerMock).poll(anyLong());
+        inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+        inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+
+        CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+        assertThat(committedOffsets.get(partition).offset(), is(0L));
+        assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
+    }
+
+    private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) {
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+        when(consumerMock.poll(anyLong()))
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()))))
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, spoutConfig.getMaxUncommittedOffsets() - 1, spoutConfig.getMaxUncommittedOffsets()))));
+
+        for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() * 2; i++) {
+            spout.nextTuple();
+        }
+
+        verify(consumerMock, times(2)).poll(anyLong());
+        verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
+    }
+
+    @Test
+    public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception {
+        //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .build();
+        doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+    }
+
+    @Test
+    public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception {
+        //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+            .build();
+        doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
+    }
+
+    private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) {
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+        spout.nextTuple();
+
+        ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+        assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+        spout.fail(msgIdCaptor.getValue());
+
+        reset(consumerMock);
+
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+            SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 1, 1))));
+
+        spout.nextTuple();
+
+        //The consumer should not be seeking to retry the failed tuple, it should just be continuing from the current position
+        verify(consumerMock, never()).seek(eq(partition), anyLong());
+    }
+
+    @Test
+    public void testAtMostOnceModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setTupleTrackingEnforced(true)
+            .build();
+        doTestModeCannotReplayTuples(spoutConfig);
+    }
+
+    @Test
+    public void testNoGuaranteeModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in no guarantee mode
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+            .setTupleTrackingEnforced(true)
+            .build();
+        doTestModeCannotReplayTuples(spoutConfig);
+    }
+
+    @Test
+    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setTupleTrackingEnforced(true)
+            .build();
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+            spout.nextTuple();
+            reset(consumerMock);
+
+            ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+            assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+            spout.ack(msgIdCaptor.getValue());
+
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<String, String>>>emptyMap()));
+
+            spout.nextTuple();
+
+            verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>>() {
+                @Override
+                public boolean matches(Object arg) {
+                    Map<TopicPartition, OffsetAndMetadata> castArg = (Map<TopicPartition, OffsetAndMetadata>) arg;
+                    return !castArg.containsKey(partition);
+                }
+            }));
+        }
+    }
+
+    @Test
+    public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
+        //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
+            .setTupleTrackingEnforced(true)
+            .build();
+
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition, 0, 1))));
+
+            spout.nextTuple();
+
+            when(consumerMock.position(partition)).thenReturn(1L);
+
+            ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
+            assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+
+            spout.nextTuple();
+
+            verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
+
+            CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+            assertThat(committedOffsets.get(partition).offset(), is(1L));
+            assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
new file mode 100644
index 0000000..c2c46b5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.KafkaSpout.TIMER_DELAY_MS;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.KafkaUnitRule;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaSpoutReactivationTest {
+
+    @Rule
+    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+    private final TopologyContext topologyContext = mock(TopologyContext.class);
+    private final Map<String, Object> conf = new HashMap<>();
+    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
+    private final long commitOffsetPeriodMs = 2_000;
+    private KafkaConsumer<String, String> consumerSpy;
+    private KafkaConsumer<String, String> postReactivationConsumerSpy;
+    private KafkaSpout<String, String> spout;
+    private final int maxPollRecords = 10;
+
+    @Before
+    public void setUp() {
+        KafkaSpoutConfig<String, String> spoutConfig =
+            SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(
+                KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitRule.getKafkaUnit().getKafkaPort(),
+                    SingleTopicKafkaSpoutConfiguration.TOPIC))
+                .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
+                .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+                .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
+                .build();
+        KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactoryDefault<>();
+        this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+        this.postReactivationConsumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+        KafkaConsumerFactory<String, String> consumerFactoryMock = mock(KafkaConsumerFactory.class);
+        when(consumerFactoryMock.createConsumer(any(KafkaSpoutConfig.class)))
+            .thenReturn(consumerSpy)
+            .thenReturn(postReactivationConsumerSpy);
+        this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock);
+    }
+
+    private void prepareSpout(int messageCount) throws Exception {
+        SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitRule.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
+        SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
+    }
+
+    private KafkaSpoutMessageId emitOne() {
+        ArgumentCaptor<KafkaSpoutMessageId> messageId = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        spout.nextTuple();
+        verify(collector).emit(anyString(), anyList(), messageId.capture());
+        reset(collector);
+        return messageId.getValue();
+    }
+
+    @Test
+    public void testSpoutMustHandleReactivationGracefully() throws Exception {
+        try (Time.SimulatedTime time = new Time.SimulatedTime()) {
+            int messageCount = maxPollRecords * 2;
+            prepareSpout(messageCount);
+
+            //Emit and ack some tuples, ensure that some polled tuples remain cached in the spout by emitting less than maxPollRecords
+            int beforeReactivationEmits = maxPollRecords - 3;
+            for (int i = 0; i < beforeReactivationEmits - 1; i++) {
+                KafkaSpoutMessageId msgId = emitOne();
+                spout.ack(msgId);
+            }
+
+            KafkaSpoutMessageId ackAfterDeactivateMessageId = emitOne();
+            
+            //Cycle spout activation
+            spout.deactivate();
+            SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(consumerSpy, commitCapture, beforeReactivationEmits - 1);
+            //Tuples may be acked/failed after the spout deactivates, so we have to be able to handle this too
+            spout.ack(ackAfterDeactivateMessageId);
+            spout.activate();
+
+            //Emit and ack the rest
+            for (int i = beforeReactivationEmits; i < messageCount; i++) {
+                KafkaSpoutMessageId msgId = emitOne();
+                spout.ack(msgId);
+            }
+
+            //Commit
+            Time.advanceTime(TIMER_DELAY_MS + commitOffsetPeriodMs);
+            spout.nextTuple();
+
+            //Verify that no more tuples are emitted and all tuples are committed
+            SingleTopicKafkaUnitSetupHelper.verifyAllMessagesCommitted(postReactivationConsumerSpy, commitCapture, messageCount);
+
+            reset(collector);
+            spout.nextTuple();
+            verify(collector, never()).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index a554a3b..29d2a22 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,11 +15,26 @@
  */
 package org.apache.storm.kafka.spout;
 
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -27,45 +42,31 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
-
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
 import org.mockito.Captor;
-
-import static org.mockito.Mockito.reset;
-
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.Matchers.hasKey;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
 
 public class KafkaSpoutRebalanceTest {
 
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
 
+    private final long offsetCommitPeriodMs = 2_000;
+    private final Map<String, Object> conf = new HashMap<>();
     private TopologyContext contextMock;
     private SpoutOutputCollector collectorMock;
-    private Map conf;
     private KafkaConsumer<String, String> consumerMock;
     private KafkaConsumerFactory<String, String> consumerFactoryMock;
 
@@ -74,7 +75,6 @@ public class KafkaSpoutRebalanceTest {
         MockitoAnnotations.initMocks(this);
         contextMock = mock(TopologyContext.class);
         collectorMock = mock(SpoutOutputCollector.class);
-        conf = new HashMap<>();
         consumerMock = mock(KafkaConsumer.class);
         consumerFactoryMock = new KafkaConsumerFactory<String, String>(){
             @Override
@@ -85,44 +85,39 @@ public class KafkaSpoutRebalanceTest {
     }
 
     //Returns messageIds in order of emission
-    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
-        //Setup spout with mock consumer so we can get at the rebalance listener
+    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) {
+        //Setup spout with mock consumer so we can get at the rebalance listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
 
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyList(), rebalanceListenerCapture.capture());
-
         //Assign partitions to the spout
         ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
-        List<TopicPartition> assignedPartitions = new ArrayList<>();
+        Set<TopicPartition> assignedPartitions = new HashSet<>();
         assignedPartitions.add(partitionThatWillBeRevoked);
         assignedPartitions.add(assignedPartition);
         consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+        when(consumerMock.assignment()).thenReturn(assignedPartitions);
 
         //Make the consumer return a single message for each partition
-        Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
-        firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord<>(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
-        Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
-        secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
         when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords(firstPartitionRecords))
-                .thenReturn(new ConsumerRecords(secondPartitionRecords))
-                .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(partitionThatWillBeRevoked, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partitionThatWillBeRevoked, 0, 1))))
+            .thenReturn(new ConsumerRecords<>(Collections.singletonMap(assignedPartition, SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(assignedPartition, 0, 1))))
+            .thenReturn(new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>()));
 
         //Emit the messages
         spout.nextTuple();
         ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collectorMock).emit(anyString(), anyList(), messageIdForRevokedPartition.capture());
+        verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForRevokedPartition.capture());
         reset(collectorMock);
         spout.nextTuple();
         ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collectorMock).emit(anyString(), anyList(), messageIdForAssignedPartition.capture());
+        verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForAssignedPartition.capture());
 
         //Now rebalance
         consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
         consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
-        
+        when(consumerMock.assignment()).thenReturn(Collections.singleton(assignedPartition));
+
         List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
         emittedMessageIds.add(messageIdForRevokedPartition.getValue());
         emittedMessageIds.add(messageIdForAssignedPartition.getValue());
@@ -132,49 +127,122 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
-        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
-        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
-        TopicPartition assignedPartition = new TopicPartition(topic, 2);
-        
-        //Emit a message on each partition and revoke the first partition
-        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
-        
-        //Ack both emitted tuples
-        spout.ack(emittedMessageIds.get(0));
-        spout.ack(emittedMessageIds.get(1));
-
-        //Ensure the commit timer has expired
-        Thread.sleep(510);
-
-        //Make the spout commit any acked tuples
-        spout.nextTuple();
-        //Verify that it only committed the message on the assigned partition
-        verify(consumerMock).commitSync(commitCapture.capture());
-
-        Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
-        assertThat(commitCaptureMap, hasKey(assignedPartition));
-        assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
+            KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+                .build(), consumerFactoryMock);
+            String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+            TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+            TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+            //Emit a message on each partition and revoke the first partition
+            List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+                spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
+
+            //Ack both emitted tuples
+            spout.ack(emittedMessageIds.get(0));
+            spout.ack(emittedMessageIds.get(1));
+
+            //Ensure the commit timer has expired
+            Time.advanceTime(offsetCommitPeriodMs + KafkaSpout.TIMER_DELAY_MS);
+            //Make the spout commit any acked tuples
+            spout.nextTuple();
+            //Verify that it only committed the message on the assigned partition
+            verify(consumerMock, times(1)).commitSync(commitCapture.capture());
+
+            Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+            assertThat(commitCaptureMap, hasKey(assignedPartition));
+            assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+        }
     }
-    
+
     @Test
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
         KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
+        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+            .setOffsetCommitPeriodMs(10)
+            .setRetry(retryServiceMock)
+            .build(), consumerFactoryMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);
-        
+
+        when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class)))
+            .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0))
+            .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
+
         //Emit a message on each partition and revoke the first partition
-        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+            spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
+
+        //Check that only two message ids were generated
+        verify(retryServiceMock, times(2)).getMessageId(Mockito.any(ConsumerRecord.class));
         
         //Fail both emitted tuples
         spout.fail(emittedMessageIds.get(0));
         spout.fail(emittedMessageIds.get(1));
-        
+
         //Check that only the tuple on the currently assigned partition is retried
         verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
         verify(retryServiceMock).schedule(emittedMessageIds.get(1));
     }
+
+    @Test
+    public void testReassignPartitionSeeksForOnlyNewPartitions() {
+        /*
+         * When partitions are reassigned, the spout should seek with the first poll offset strategy for new partitions.
+         * Previously assigned partitions should be left alone, since the spout keeps the emitted and acked state for those.
+         */
+
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+        Subscription subscriptionMock = mock(Subscription.class);
+        doNothing()
+            .when(subscriptionMock)
+            .subscribe(any(KafkaConsumer.class), rebalanceListenerCapture.capture(), any(TopologyContext.class));
+        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+            .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
+            .build(), consumerFactoryMock);
+        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+        TopicPartition assignedPartition = new TopicPartition(topic, 1);
+        TopicPartition newPartition = new TopicPartition(topic, 2);
+
+        //Setup spout with mock consumer so we can get at the rebalance listener   
+        spout.open(conf, contextMock, collectorMock);
+        spout.activate();
+
+        //Assign partitions to the spout
+        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+        Set<TopicPartition> assignedPartitions = new HashSet<>();
+        assignedPartitions.add(assignedPartition);
+        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+        reset(consumerMock);
+        
+        //Set up committed so it looks like some messages have been committed on each partition
+        long committedOffset = 500;
+        when(consumerMock.committed(assignedPartition)).thenReturn(new OffsetAndMetadata(committedOffset));
+        when(consumerMock.committed(newPartition)).thenReturn(new OffsetAndMetadata(committedOffset));
+
+        //Now rebalance and add a new partition
+        consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+        Set<TopicPartition> newAssignedPartitions = new HashSet<>();
+        newAssignedPartitions.add(assignedPartition);
+        newAssignedPartitions.add(newPartition);
+        consumerRebalanceListener.onPartitionsAssigned(newAssignedPartitions);
+        
+        //This partition was previously assigned, so the consumer position shouldn't change
+        verify(consumerMock, never()).seek(eq(assignedPartition), anyLong());
+        //This partition is new, and should start at the committed offset
+        verify(consumerMock).seek(newPartition, committedOffset);
+    }
 }