You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/08 18:14:36 UTC

[2/5] storm git commit: Merge branch 'master' of https://github.com/apache/storm into STORM-2104

Merge branch 'master' of https://github.com/apache/storm into STORM-2104


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd1f5c54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd1f5c54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd1f5c54

Branch: refs/heads/master
Commit: bd1f5c54752f67b484a83c26667331234234d3a3
Parents: 11fcaff a912cf3
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Sun Nov 20 12:49:49 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Sun Nov 20 13:01:22 2016 +0100

----------------------------------------------------------------------
 CHANGELOG.md                                    |  16 +
 examples/storm-kafka-client-examples/pom.xml    | 134 +++
 .../TridentKafkaClientWordCountNamedTopics.java | 156 ++++
 ...identKafkaClientWordCountWildcardTopics.java |  46 ++
 examples/storm-kafka-examples/pom.xml           |  18 +
 .../storm/kafka/TridentKafkaTopology.java       |  91 ---
 .../kafka/trident/KafkaProducerTopology.java    |  67 ++
 .../storm/kafka/trident/LocalSubmitter.java     |  91 +++
 .../trident/TridentKafkaConsumerTopology.java   |  94 +++
 .../kafka/trident/TridentKafkaTopology.java     |  89 ++
 .../kafka/trident/TridentKafkaWordCount.java    | 140 ++++
 examples/storm-starter/pom.xml                  |  16 -
 .../starter/trident/DebugMemoryMapState.java    |   2 +-
 .../starter/trident/TridentKafkaWordCount.java  | 278 -------
 external/sql/pom.xml                            |   1 +
 .../apache/storm/sql/parser/SqlCreateTable.java |  20 +-
 .../test/org/apache/storm/sql/TestStormSql.java |  11 +-
 .../storm-sql-external/storm-sql-kafka/pom.xml  |   4 -
 .../sql/kafka/KafkaDataSourcesProvider.java     |  63 +-
 .../sql/kafka/TestKafkaDataSourcesProvider.java |  31 +-
 .../storm-sql-mongodb/pom.xml                   |  84 ++
 .../sql/mongodb/MongoDataSourcesProvider.java   | 121 +++
 ...apache.storm.sql.runtime.DataSourcesProvider |  16 +
 .../mongodb/TestMongoDataSourcesProvider.java   | 122 +++
 .../storm-sql-external/storm-sql-redis/pom.xml  |   4 -
 .../sql/redis/RedisDataSourcesProvider.java     |  48 +-
 .../sql/redis/TestRedisDataSourcesProvider.java |  26 +-
 external/sql/storm-sql-runtime/pom.xml          |  14 +
 .../storm/sql/runtime/DataSourcesProvider.java  |   5 +-
 .../storm/sql/runtime/DataSourcesRegistry.java  |   5 +-
 .../sql/runtime/serde/avro/AvroScheme.java      |  74 ++
 .../sql/runtime/serde/avro/AvroSerializer.java  |  72 ++
 .../sql/runtime/serde/avro/CachedSchemas.java   |  41 +
 .../storm/sql/runtime/serde/csv/CsvScheme.java  |  70 ++
 .../sql/runtime/serde/csv/CsvSerializer.java    |  59 ++
 .../sql/runtime/serde/json/JsonScheme.java      |   2 +-
 .../storm/sql/runtime/serde/tsv/TsvScheme.java  |  58 ++
 .../sql/runtime/serde/tsv/TsvSerializer.java    |  54 ++
 .../storm/sql/runtime/utils/FieldInfoUtils.java |  39 +
 .../storm/sql/runtime/utils/SerdeUtils.java     | 123 +++
 .../apache/storm/sql/runtime/utils/Utils.java   |  55 ++
 .../apache/storm/sql/TestAvroSerializer.java    |  72 ++
 .../org/apache/storm/sql/TestCsvSerializer.java |  54 ++
 .../org/apache/storm/sql/TestTsvSerializer.java |  46 ++
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |  25 +-
 external/storm-kafka-client/pom.xml             |  26 +
 .../apache/storm/kafka/spout/KafkaSpout.java    |  31 +-
 .../storm/kafka/spout/KafkaSpoutStream.java     |   2 +-
 .../storm/kafka/spout/KafkaSpoutStreams.java    |   3 +
 .../spout/KafkaSpoutStreamsNamedTopics.java     |  11 +
 .../spout/KafkaSpoutStreamsWildcardTopics.java  |   6 +
 .../trident/KafkaTridentSpoutBatchMetadata.java |  83 ++
 .../spout/trident/KafkaTridentSpoutEmitter.java | 197 +++++
 .../spout/trident/KafkaTridentSpoutManager.java | 119 +++
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  80 ++
 .../KafkaTridentSpoutOpaqueCoordinator.java     |  64 ++
 .../KafkaTridentSpoutTopicPartition.java        |  68 ++
 ...KafkaTridentSpoutTopicPartitionRegistry.java |  48 ++
 .../trident/KafkaTridentSpoutTransactional.java |  48 ++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |   4 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 240 ++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  30 +-
 .../builders/TopicKeyValueTupleBuilder.java     |   2 +-
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |   5 +-
 pom.xml                                         |   1 +
 .../org/apache/storm/daemon/local_executor.clj  |  42 -
 .../src/clj/org/apache/storm/daemon/worker.clj  | 818 -------------------
 storm-core/src/clj/org/apache/storm/testing.clj |  28 +-
 .../src/jvm/org/apache/storm/Constants.java     |   6 +-
 .../src/jvm/org/apache/storm/daemon/Task.java   |  46 +-
 .../storm/daemon/supervisor/AdvancedFSOps.java  |   9 +-
 .../storm/daemon/supervisor/BasicContainer.java |   2 +-
 .../storm/daemon/supervisor/LocalContainer.java |  11 +-
 .../daemon/supervisor/ReadClusterState.java     |   2 +-
 .../storm/daemon/worker/LogConfigManager.java   | 156 ++++
 .../org/apache/storm/daemon/worker/Worker.java  | 454 ++++++++++
 .../apache/storm/daemon/worker/WorkerState.java | 691 ++++++++++++++++
 .../jvm/org/apache/storm/executor/Executor.java |  55 +-
 .../apache/storm/executor/ExecutorShutdown.java |   5 +-
 .../apache/storm/executor/ExecutorTransfer.java |  15 +-
 .../apache/storm/executor/IRunningExecutor.java |   2 +-
 .../apache/storm/executor/LocalExecutor.java    |  56 ++
 .../storm/executor/bolt/BoltExecutor.java       |  10 +-
 .../storm/executor/spout/SpoutExecutor.java     |   3 +-
 .../DeserializingConnectionCallback.java        |  10 +-
 .../storm/scheduler/resource/RAS_Node.java      |   4 +-
 .../org/apache/storm/security/auth/AutoSSL.java | 161 ++++
 .../storm/security/auth/ThriftClient.java       |  13 +-
 .../org/apache/storm/task/TopologyContext.java  |   3 +
 .../org/apache/storm/utils/TransferDrainer.java |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 116 ++-
 .../apache/storm/messaging/netty_unit_test.clj  |  13 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |   6 +-
 .../test/clj/org/apache/storm/worker_test.clj   | 205 -----
 .../daemon/supervisor/BasicContainerTest.java   |   2 +-
 .../daemon/worker/LogConfigManagerTest.java     | 219 +++++
 .../apache/storm/daemon/worker/WorkerTest.java  |  39 +
 .../apache/storm/security/auth/AutoSSLTest.java | 136 +++
 storm-dist/binary/src/main/assembly/binary.xml  |   7 +
 99 files changed, 5299 insertions(+), 1797 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/pom.xml
index 34b9fdc,2d9a844..2b56336
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@@ -70,8 -77,21 +84,20 @@@
          <dependency>
              <groupId>junit</groupId>
              <artifactId>junit</artifactId>
 -            <version>4.11</version>
              <scope>test</scope>
          </dependency>
+         <dependency>
+             <groupId>info.batey.kafka</groupId>
+             <artifactId>kafka-unit</artifactId>
+             <version>0.6</version>
+             <scope>test</scope>
+         </dependency>
+         <dependency>
+             <groupId>org.slf4j</groupId>
+             <artifactId>log4j-over-slf4j</artifactId>
+             <version>${log4j-over-slf4j.version}</version>
+             <scope>test</scope>
+         </dependency>
      </dependencies>
  
      <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 5534082,439492b..ce01a76
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@@ -299,19 -289,18 +302,21 @@@ public class KafkaSpout<K, V> extends B
              LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
          } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
              LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
 -        } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
 -            final List<Object> tuple = tuplesBuilder.buildTuple(record);
 -            kafkaSpoutStreams.emit(collector, tuple, msgId);
 -            emitted.add(msgId);
 -            numUncommittedOffsets++;
 -            if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
 -                retryService.remove(msgId);  // re-emitted hence remove from failed
 +        } else {
 +            boolean isScheduled = retryService.isScheduled(msgId);
 +            if (!isScheduled || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
 +                final List<Object> tuple = tuplesBuilder.buildTuple(record);
 +                kafkaSpoutStreams.emit(collector, tuple, msgId);
 +                emitted.add(msgId);
 +                numUncommittedOffsets++;
 +                if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
 +                    retryService.remove(msgId);
 +                }
 +                LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
++               return true;
              }
 -            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
 -            return true;
          }
+         return false;
      }
  
      private void commitOffsetsForAckedTuples() {

http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index b5428f9,0000000..3e077ab
mode 100644,000000..100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@@ -1,166 -1,0 +1,166 @@@
 +/*
 + * Copyright 2016 The Apache Software Foundation.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.kafka.spout;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 +import org.apache.kafka.clients.consumer.ConsumerRecord;
 +import org.apache.kafka.clients.consumer.ConsumerRecords;
 +import org.apache.kafka.clients.consumer.KafkaConsumer;
 +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 +import org.apache.kafka.common.TopicPartition;
 +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
 +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 +import org.apache.storm.spout.SpoutOutputCollector;
 +import org.apache.storm.task.TopologyContext;
 +import org.junit.Test;
 +import org.mockito.ArgumentCaptor;
 +import static org.mockito.Matchers.anyCollection;
 +import static org.mockito.Matchers.anyLong;
 +import static org.mockito.Matchers.anyObject;
 +import static org.mockito.Mockito.when;
 +import org.junit.Before;
 +import org.mockito.Captor;
 +import static org.mockito.Mockito.reset;
 +import org.mockito.MockitoAnnotations;
 +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
 +import static org.hamcrest.CoreMatchers.not;
 +import static org.hamcrest.Matchers.hasKey;
 +import static org.junit.Assert.assertThat;
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.never;
 +import static org.mockito.Mockito.verify;
 +
 +public class KafkaSpoutRebalanceTest {
 +
 +    @Captor
 +    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
 +
 +    private TopologyContext contextMock;
 +    private SpoutOutputCollector collectorMock;
 +    private Map conf;
 +    private KafkaConsumer<String, String> consumerMock;
 +    private KafkaConsumerFactory<String, String> consumerFactoryMock;
 +
 +    @Before
 +    public void setUp() {
 +        MockitoAnnotations.initMocks(this);
 +        contextMock = mock(TopologyContext.class);
 +        collectorMock = mock(SpoutOutputCollector.class);
 +        conf = new HashMap<>();
 +        consumerMock = mock(KafkaConsumer.class);
 +        consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
 +    }
 +
 +    //Returns messageIds in order of emission
 +    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
 +        //Setup spout with mock consumer so we can get at the rebalance listener
 +        spout.open(conf, contextMock, collectorMock);
 +        spout.activate();
 +
 +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
 +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
 +
 +        //Assign partitions to the spout
 +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
 +        List<TopicPartition> assignedPartitions = new ArrayList<>();
 +        assignedPartitions.add(partitionThatWillBeRevoked);
 +        assignedPartitions.add(assignedPartition);
 +        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
 +
 +        //Make the consumer return a single message for each partition
 +        Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
 +        firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
 +        Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
 +        secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
 +        when(consumerMock.poll(anyLong()))
 +                .thenReturn(new ConsumerRecords(firstPartitionRecords))
 +                .thenReturn(new ConsumerRecords(secondPartitionRecords))
 +                .thenReturn(new ConsumerRecords(Collections.emptyMap()));
 +
 +        //Emit the messages
 +        spout.nextTuple();
 +        ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
 +        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
 +        reset(collectorMock);
 +        spout.nextTuple();
 +        ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
 +        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
 +
 +        //Now rebalance
 +        consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
 +        consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
 +        
 +        List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
 +        emittedMessageIds.add(messageIdForRevokedPartition.getValue());
 +        emittedMessageIds.add(messageIdForAssignedPartition.getValue());
 +        return emittedMessageIds;
 +    }
 +
 +    @Test
 +    public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
 +        //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
 +        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
-         String topic = SingleTopicKafkaSpoutConfiguration.topic;
++        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
 +        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
 +        TopicPartition assignedPartition = new TopicPartition(topic, 2);
 +        
 +        //Emit a message on each partition and revoke the first partition
 +        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
 +        
 +        //Ack both emitted tuples
 +        spout.ack(emittedMessageIds.get(0));
 +        spout.ack(emittedMessageIds.get(1));
 +
 +        //Ensure the commit timer has expired
 +        Thread.sleep(510);
 +
 +        //Make the spout commit any acked tuples
 +        spout.nextTuple();
 +        //Verify that it only committed the message on the assigned partition
 +        verify(consumerMock).commitSync(commitCapture.capture());
 +
 +        Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
 +        assertThat(commitCaptureMap, hasKey(assignedPartition));
 +        assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
 +    }
 +    
 +    @Test
 +    public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
 +        //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
 +        KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
 +        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
-         String topic = SingleTopicKafkaSpoutConfiguration.topic;
++        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
 +        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
 +        TopicPartition assignedPartition = new TopicPartition(topic, 2);
 +        
 +        //Emit a message on each partition and revoke the first partition
 +        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
 +        
 +        //Fail both emitted tuples
 +        spout.fail(emittedMessageIds.get(0));
 +        spout.fail(emittedMessageIds.get(1));
 +        
 +        //Check that only the tuple on the currently assigned partition is retried
 +        verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
 +        verify(retryServiceMock).schedule(emittedMessageIds.get(1));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 157543c,baece93..6921f7c
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@@ -46,17 -46,9 +46,17 @@@ public class SingleTopicKafkaSpoutConfi
          return tp.createTopology();
      }
  
-     static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
 -    public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
 -        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
 -                .setOffsetCommitPeriodMs(10_000)
++    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
 +        return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
 +    }
-     
-     static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
++
++    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
 +        return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
 +    }
 +    
 +    static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
 +        return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
 +                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
                  .setFirstPollOffsetStrategy(EARLIEST)
                  .setMaxUncommittedOffsets(250)
                  .setPollTimeoutMs(1000)