You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/11/29 18:19:10 UTC
[1/3] storm git commit: STORM-3290: Split configuration for
storm-kafka-client Trident and non-Trident spout into two classes
Repository: storm
Updated Branches:
refs/heads/master 52cb6b20a -> 6d871a73e
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
index 1ccc9a7..059c8c7 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
@@ -45,14 +45,14 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.topology.TransactionAttempt;
@@ -91,7 +91,7 @@ public class KafkaTridentSpoutEmitterTest {
@Test
public void testGetOrderedPartitionsIsConsistent() {
KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
.build(),
topologyContextMock,
config -> consumer, new TopicAssigner());
@@ -129,7 +129,7 @@ public class KafkaTridentSpoutEmitterTest {
});
KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1)
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1)
.build(),
topologyContextMock,
config -> consumer, new TopicAssigner());
@@ -154,7 +154,7 @@ public class KafkaTridentSpoutEmitterTest {
TopicAssigner assignerMock = mock(TopicAssigner.class);
KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
.build(),
topologyContextMock,
config -> consumer, assignerMock);
@@ -179,7 +179,7 @@ public class KafkaTridentSpoutEmitterTest {
List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords);
records.forEach(record -> consumer.addRecord(record));
return new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
.setRecordTranslator(r -> new Values(r.offset()), new Fields("offset"))
.setFirstPollOffsetStrategy(firstPollOffsetStrategy)
.build(),
@@ -242,8 +242,8 @@ public class KafkaTridentSpoutEmitterTest {
when(consumerMock.assignment()).thenReturn(Collections.singleton(tp));
ConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
- .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+ .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
.build(),
mock(TopologyContext.class),
consumerFactory, new TopicAssigner());
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
index 6f46a36..ab8e3c2 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
@@ -33,10 +33,9 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
import org.junit.Test;
@@ -52,8 +51,8 @@ public class KafkaTridentSpoutOpaqueCoordinatorTest {
TopicFilter mockFilter = mock(TopicFilter.class);
when(mockFilter.getAllSubscribedPartitions(any())).thenReturn(Collections.singleton(expectedPartition));
- KafkaSpoutConfig<String, String> spoutConfig =
- SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1)
+ KafkaTridentSpoutConfig<String, String> spoutConfig =
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1)
.build();
KafkaTridentSpoutCoordinator<String, String> coordinator = new KafkaTridentSpoutCoordinator<>(spoutConfig, ignored -> mockConsumer);
@@ -80,8 +79,8 @@ public class KafkaTridentSpoutOpaqueCoordinatorTest {
.thenReturn(Collections.singleton(expectedPartition))
.thenReturn(allPartitions);
- KafkaSpoutConfig<String, String> spoutConfig =
- SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1)
+ KafkaTridentSpoutConfig<String, String> spoutConfig =
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1)
.build();
KafkaTridentSpoutCoordinator<String, String> coordinator = new KafkaTridentSpoutCoordinator<>(spoutConfig, ignored -> mockConsumer);
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/config/builder/SingleTopicKafkaTridentSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/config/builder/SingleTopicKafkaTridentSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/config/builder/SingleTopicKafkaTridentSpoutConfiguration.java
new file mode 100644
index 0000000..5adeb1e
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/config/builder/SingleTopicKafkaTridentSpoutConfiguration.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident.config.builder;
+
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class SingleTopicKafkaTridentSpoutConfiguration {
+
+ public static final String TOPIC = "test";
+
+ public static KafkaTridentSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(int port) {
+ return setCommonSpoutConfig(KafkaTridentSpoutConfig.builder("127.0.0.1:" + port, TOPIC));
+ }
+
+ public static KafkaTridentSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(TopicFilter topicFilter, ManualPartitioner topicPartitioner, int port) {
+ return setCommonSpoutConfig(new KafkaTridentSpoutConfig.Builder<>("127.0.0.1:" + port, topicFilter, topicPartitioner));
+ }
+
+ public static KafkaTridentSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaTridentSpoutConfig.Builder<String, String> config) {
+ return config.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()),
+ new Fields("topic", "key", "value"))
+ .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setPollTimeoutMs(1000);
+ }
+}
[3/3] storm git commit: Merge branch 'STORM-3290' into asfgit-master
Posted by sr...@apache.org.
Merge branch 'STORM-3290' into asfgit-master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6d871a73
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6d871a73
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6d871a73
Branch: refs/heads/master
Commit: 6d871a73e7341c956a56d09f7b3f7a367a5752e3
Parents: 52cb6b2 63de17e
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Thu Nov 29 19:18:00 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Thu Nov 29 19:18:00 2018 +0100
----------------------------------------------------------------------
.../KafkaSpoutTopologyMainNamedTopics.java | 2 +-
.../KafkaSpoutTopologyMainWildcardTopics.java | 2 +-
.../TridentKafkaClientTopologyNamedTopics.java | 27 +-
...ridentKafkaClientTopologyWildcardTopics.java | 12 +-
.../apache/storm/perf/KafkaClientHdfsTopo.java | 3 +-
.../perf/KafkaClientSpoutNullBoltTopo.java | 3 +-
.../kafka/spout/FirstPollOffsetStrategy.java | 42 +++
.../apache/storm/kafka/spout/KafkaSpout.java | 11 +-
.../storm/kafka/spout/KafkaSpoutConfig.java | 330 ++++---------------
.../spout/internal/CommonKafkaSpoutConfig.java | 264 +++++++++++++++
.../kafka/spout/internal/ConsumerFactory.java | 4 +-
.../spout/internal/ConsumerFactoryDefault.java | 6 +-
.../spout/trident/KafkaTridentSpoutConfig.java | 100 ++++++
.../trident/KafkaTridentSpoutCoordinator.java | 8 +-
.../spout/trident/KafkaTridentSpoutEmitter.java | 32 +-
.../spout/trident/KafkaTridentSpoutOpaque.java | 4 +-
.../trident/KafkaTridentSpoutTransactional.java | 4 +-
.../trident/internal/OutputFieldsExtractor.java | 4 +-
.../kafka/spout/KafkaSpoutAbstractTest.java | 4 +-
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 1 -
.../kafka/spout/KafkaSpoutReactivationTest.java | 4 +-
.../kafka/spout/KafkaSpoutRebalanceTest.java | 2 +-
...outTopologyDeployActivateDeactivateTest.java | 2 +-
.../kafka/spout/MaxUncommittedOffsetTest.java | 2 +-
.../SingleTopicKafkaSpoutConfiguration.java | 2 +-
.../KafkaTridentSpoutBatchMetadataTest.java | 2 -
.../trident/KafkaTridentSpoutEmitterTest.java | 16 +-
.../KafkaTridentSpoutOpaqueCoordinatorTest.java | 11 +-
...ngleTopicKafkaTridentSpoutConfiguration.java | 47 +++
29 files changed, 591 insertions(+), 360 deletions(-)
----------------------------------------------------------------------
[2/3] storm git commit: STORM-3290: Split configuration for
storm-kafka-client Trident and non-Trident spout into two classes
Posted by sr...@apache.org.
STORM-3290: Split configuration for storm-kafka-client Trident and non-Trident spout into two classes
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63de17ed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63de17ed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63de17ed
Branch: refs/heads/master
Commit: 63de17eda9220163867ea051d03adc44eaa7d113
Parents: 40f1b61
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Tue Nov 20 13:03:25 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Nov 20 19:57:25 2018 +0100
----------------------------------------------------------------------
.../KafkaSpoutTopologyMainNamedTopics.java | 2 +-
.../KafkaSpoutTopologyMainWildcardTopics.java | 2 +-
.../TridentKafkaClientTopologyNamedTopics.java | 27 +-
...ridentKafkaClientTopologyWildcardTopics.java | 12 +-
.../apache/storm/perf/KafkaClientHdfsTopo.java | 3 +-
.../perf/KafkaClientSpoutNullBoltTopo.java | 3 +-
.../kafka/spout/FirstPollOffsetStrategy.java | 42 +++
.../apache/storm/kafka/spout/KafkaSpout.java | 11 +-
.../storm/kafka/spout/KafkaSpoutConfig.java | 330 ++++---------------
.../spout/internal/CommonKafkaSpoutConfig.java | 264 +++++++++++++++
.../kafka/spout/internal/ConsumerFactory.java | 4 +-
.../spout/internal/ConsumerFactoryDefault.java | 6 +-
.../spout/trident/KafkaTridentSpoutConfig.java | 100 ++++++
.../trident/KafkaTridentSpoutCoordinator.java | 8 +-
.../spout/trident/KafkaTridentSpoutEmitter.java | 32 +-
.../spout/trident/KafkaTridentSpoutOpaque.java | 4 +-
.../trident/KafkaTridentSpoutTransactional.java | 4 +-
.../trident/internal/OutputFieldsExtractor.java | 4 +-
.../kafka/spout/KafkaSpoutAbstractTest.java | 4 +-
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 1 -
.../kafka/spout/KafkaSpoutReactivationTest.java | 4 +-
.../kafka/spout/KafkaSpoutRebalanceTest.java | 2 +-
...outTopologyDeployActivateDeactivateTest.java | 2 +-
.../kafka/spout/MaxUncommittedOffsetTest.java | 2 +-
.../SingleTopicKafkaSpoutConfiguration.java | 2 +-
.../KafkaTridentSpoutBatchMetadataTest.java | 2 -
.../trident/KafkaTridentSpoutEmitterTest.java | 16 +-
.../KafkaTridentSpoutOpaqueCoordinatorTest.java | 11 +-
...ngleTopicKafkaTridentSpoutConfiguration.java | 47 +++
29 files changed, 591 insertions(+), 360 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java
index 6703738..beb7948 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java
@@ -18,7 +18,7 @@
package org.apache.storm.kafka.spout;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java
index 714f476..a574078 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainWildcardTopics.java
@@ -18,7 +18,7 @@
package org.apache.storm.kafka.spout;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
index 6fecb5c..3c92a22 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
@@ -18,11 +18,10 @@
package org.apache.storm.kafka.trident;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
import java.io.Serializable;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
@@ -32,10 +31,7 @@ import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.bolt.KafkaProducerTopology;
import org.apache.storm.kafka.spout.Func;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutTransactional;
import org.apache.storm.trident.spout.ITridentDataSource;
@@ -52,12 +48,12 @@ public class TridentKafkaClientTopologyNamedTopics {
private static final String TOPIC_2 = "test-trident-1";
private static final String KAFKA_LOCAL_BROKER = "localhost:9092";
- private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, String> spoutConfig) {
+ private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque(KafkaTridentSpoutConfig<String, String> spoutConfig) {
return new KafkaTridentSpoutOpaque<>(spoutConfig);
}
private KafkaTridentSpoutTransactional<String, String> newKafkaTridentSpoutTransactional(
- KafkaSpoutConfig<String, String> spoutConfig) {
+ KafkaTridentSpoutConfig<String, String> spoutConfig) {
return new KafkaTridentSpoutTransactional<>(spoutConfig);
}
@@ -74,23 +70,14 @@ public class TridentKafkaClientTopologyNamedTopics {
}
}
- protected KafkaSpoutConfig<String, String> newKafkaSpoutConfig(String bootstrapServers) {
- return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2)
- .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime())
+ protected KafkaTridentSpoutConfig<String, String> newKafkaSpoutConfig(String bootstrapServers) {
+ return KafkaTridentSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2)
.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
.setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
- .setRetry(newRetryService())
- .setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
- .setMaxUncommittedOffsets(250)
.build();
}
- protected KafkaSpoutRetryService newRetryService() {
- return new KafkaSpoutRetryExponentialBackoff(new TimeInterval(500L, TimeUnit.MICROSECONDS),
- TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
- }
-
public static void main(String[] args) throws Exception {
new TridentKafkaClientTopologyNamedTopics().run(args);
}
@@ -109,7 +96,7 @@ public class TridentKafkaClientTopologyNamedTopics {
StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
// Consumer
- KafkaSpoutConfig<String, String> spoutConfig = newKafkaSpoutConfig(brokerUrl);
+ KafkaTridentSpoutConfig<String, String> spoutConfig = newKafkaSpoutConfig(brokerUrl);
ITridentDataSource spout = isOpaque ? newKafkaTridentSpoutOpaque(spoutConfig) : newKafkaTridentSpoutTransactional(spoutConfig);
StormSubmitter.submitTopology("topics-consumer", tpConf,
TridentKafkaConsumerTopology.newTopology(spout));
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java
index d770ac4..7da8f91 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyWildcardTopics.java
@@ -18,11 +18,11 @@
package org.apache.storm.kafka.trident;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
@@ -33,15 +33,11 @@ public class TridentKafkaClientTopologyWildcardTopics extends TridentKafkaClient
private static final Pattern TOPIC_WILDCARD_PATTERN = Pattern.compile("test-trident(-1)?");
@Override
- protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig(String bootstrapServers) {
- return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_WILDCARD_PATTERN)
- .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+ protected KafkaTridentSpoutConfig<String,String> newKafkaSpoutConfig(String bootstrapServers) {
+ return KafkaTridentSpoutConfig.builder(bootstrapServers, TOPIC_WILDCARD_PATTERN)
.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 200)
.setRecordTranslator((r) -> new Values(r.value()), new Fields("str"))
- .setRetry(newRetryService())
- .setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
- .setMaxUncommittedOffsets(250)
.build();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
index 67117e4..e269873 100755
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientHdfsTopo.java
@@ -29,6 +29,7 @@ import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.perf.utils.Helper;
@@ -80,7 +81,7 @@ public class KafkaClientHdfsTopo {
KafkaSpoutConfig<String, String> spoutConfig = KafkaSpoutConfig.builder(bootstrapHosts, topicName)
.setFirstPollOffsetStrategy(
- KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+ FirstPollOffsetStrategy.EARLIEST)
.build();
KafkaSpout<String, String> spout = new KafkaSpout<>(spoutConfig);
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
----------------------------------------------------------------------
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
index a75785d..4ac1ed8 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/KafkaClientSpoutNullBoltTopo.java
@@ -20,6 +20,7 @@ import java.util.Map;
import java.util.Optional;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
@@ -72,7 +73,7 @@ public class KafkaClientSpoutNullBoltTopo {
.setProcessingGuarantee(processingGuarantee)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.setFirstPollOffsetStrategy(
- KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+ FirstPollOffsetStrategy.EARLIEST)
.setTupleTrackingEnforced(true)
.build();
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
new file mode 100644
index 0000000..f2e14cb
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+/**
+ * Defines how the spout seeks the offset to be used in the first poll to Kafka upon topology deployment. By default this parameter is set
+ * to UNCOMMITTED_EARLIEST.
+ */
+public enum FirstPollOffsetStrategy {
+ /**
+ * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only takes
+ * effect on topology deployment
+ */
+ EARLIEST,
+ /**
+ * The kafka spout polls records starting at the end of the partition, regardless of previous commits. This setting only takes effect on
+ * topology deployment
+ */
+ LATEST,
+ /**
+ * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST
+ */
+ UNCOMMITTED_EARLIEST,
+ /**
+ * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
+ */
+ UNCOMMITTED_LATEST;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 7d7a856..5ebef80 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -18,10 +18,10 @@
package org.apache.storm.kafka.spout;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
@@ -47,7 +47,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
@@ -145,7 +144,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
rebalanceListener = new KafkaSpoutConsumerRebalanceListener();
- consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
+ consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
tupleListener.open(conf, context);
if (canRegisterMetrics()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index b17a47c..b6059b5 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,47 +18,34 @@
package org.apache.storm.kafka.spout;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config;
import org.apache.storm.annotation.InterfaceStability;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.internal.CommonKafkaSpoutConfig;
import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
-import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
-import org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
-import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
import org.apache.storm.kafka.spout.subscription.TopicFilter;
-import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics.
*/
-public class KafkaSpoutConfig<K, V> implements Serializable {
+public class KafkaSpoutConfig<K, V> extends CommonKafkaSpoutConfig<K, V> {
private static final long serialVersionUID = 141902646130682494L;
- // 200ms
- public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
// 30s
public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
// Retry forever
public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
// 10,000,000 records => 80MBs of memory footprint in the worst case
public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
- // 2s
- public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
-
- public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
@@ -71,21 +58,11 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60;
-
- // Kafka consumer configuration
- private final Map<String, Object> kafkaProps;
- private final TopicFilter topicFilter;
- private final ManualPartitioner topicPartitioner;
- private final long pollTimeoutMs;
-
// Kafka spout configuration
- private final RecordTranslator<K, V> translator;
private final long offsetCommitPeriodMs;
private final int maxUncommittedOffsets;
- private final FirstPollOffsetStrategy firstPollOffsetStrategy;
private final KafkaSpoutRetryService retryService;
private final KafkaTupleListener tupleListener;
- private final long partitionRefreshPeriodMs;
private final boolean emitNullTuples;
private final ProcessingGuarantee processingGuarantee;
private final boolean tupleTrackingEnforced;
@@ -97,18 +74,11 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* @param builder The Builder to construct the KafkaSpoutConfig from
*/
public KafkaSpoutConfig(Builder<K, V> builder) {
- setKafkaPropsForProcessingGuarantee(builder);
- this.kafkaProps = builder.kafkaProps;
- this.topicFilter = builder.topicFilter;
- this.topicPartitioner = builder.topicPartitioner;
- this.translator = builder.translator;
- this.pollTimeoutMs = builder.pollTimeoutMs;
+ super(builder.setKafkaPropsForProcessingGuarantee());
this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
- this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
this.retryService = builder.retryService;
this.tupleListener = builder.tupleListener;
- this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
this.emitNullTuples = builder.emitNullTuples;
this.processingGuarantee = builder.processingGuarantee;
this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
@@ -116,36 +86,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
/**
- * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment.
- * By default this parameter is set to UNCOMMITTED_EARLIEST.
- */
- public enum FirstPollOffsetStrategy {
- /**
- * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only
- * takes effect on topology deployment
- */
- EARLIEST,
- /**
- * The kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits. This
- * setting only takes effect on topology deployment
- */
- LATEST,
- /**
- * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST
- */
- UNCOMMITTED_EARLIEST,
- /**
- * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
- */
- UNCOMMITTED_LATEST;
-
- @Override
- public String toString() {
- return "FirstPollOffsetStrategy{" + super.toString() + "}";
- }
- }
-
- /**
* This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
* i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
* The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
@@ -173,34 +113,27 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
NO_GUARANTEE,
}
- public static class Builder<K, V> {
+ public static class Builder<K, V> extends CommonKafkaSpoutConfig.Builder<K, V, Builder<K, V>> {
- private final Map<String, Object> kafkaProps;
- private final TopicFilter topicFilter;
- private final ManualPartitioner topicPartitioner;
- private RecordTranslator<K, V> translator;
- private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
- private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER;
- private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
private boolean emitNullTuples = false;
private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
private boolean tupleTrackingEnforced = false;
private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
public Builder(String bootstrapServers, String... topics) {
- this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
+ super(bootstrapServers, topics);
}
public Builder(String bootstrapServers, Set<String> topics) {
- this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
+ super(bootstrapServers, topics);
}
public Builder(String bootstrapServers, Pattern topics) {
- this(bootstrapServers, new PatternTopicFilter(topics), new RoundRobinManualPartitioner());
+ super(bootstrapServers, topics);
}
/**
@@ -211,58 +144,11 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* @param topicPartitioner The topic partitioner defining which topics and partitions are assinged to each spout task
*/
public Builder(String bootstrapServers, TopicFilter topicFilter, ManualPartitioner topicPartitioner) {
- kafkaProps = new HashMap<>();
- if (bootstrapServers == null || bootstrapServers.isEmpty()) {
- throw new IllegalArgumentException("bootstrap servers cannot be null");
- }
- kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- this.topicFilter = topicFilter;
- this.topicPartitioner = topicPartitioner;
- this.translator = new DefaultRecordTranslator<>();
- }
-
- /**
- * Set a {@link KafkaConsumer} property.
- */
- public Builder<K, V> setProp(String key, Object value) {
- kafkaProps.put(key, value);
- return this;
- }
-
- /**
- * Set multiple {@link KafkaConsumer} properties.
- */
- public Builder<K, V> setProp(Map<String, Object> props) {
- kafkaProps.putAll(props);
- return this;
- }
-
- /**
- * Set multiple {@link KafkaConsumer} properties.
- */
- public Builder<K, V> setProp(Properties props) {
- props.forEach((key, value) -> {
- if (key instanceof String) {
- kafkaProps.put((String) key, value);
- } else {
- throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
- }
- });
- return this;
+ super(bootstrapServers, topicFilter, topicPartitioner);
}
//Spout Settings
/**
- * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
- *
- * @param pollTimeoutMs time in ms
- */
- public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) {
- this.pollTimeoutMs = pollTimeoutMs;
- return this;
- }
-
- /**
* Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
*
* <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE} or
@@ -292,17 +178,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
/**
- * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
- * documentation in {@link FirstPollOffsetStrategy}
- *
- * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
- */
- public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
- this.firstPollOffsetStrategy = firstPollOffsetStrategy;
- return this;
- }
-
- /**
* Sets the retry service for the spout to use.
*
* <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
@@ -332,46 +207,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return this;
}
- public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) {
- this.translator = translator;
- return this;
- }
-
- /**
- * Configure a translator with tuples to be emitted on the default stream.
- *
- * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
- * @param fields the names of the fields extracted
- * @return this to be able to chain configuration
- */
- public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
- return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
- }
-
- /**
- * Configure a translator with tuples to be emitted to a given stream.
- *
- * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
- * @param fields the names of the fields extracted
- * @param stream the stream to emit the tuples on
- * @return this to be able to chain configuration
- */
- public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
- return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
- }
-
- /**
- * Sets partition refresh period in milliseconds. This is how often Kafka will be polled to check for new topics and/or new
- * partitions.
- *
- * @param partitionRefreshPeriodMs time in milliseconds
- * @return the builder (this)
- */
- public Builder<K, V> setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) {
- this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
- return this;
- }
-
/**
* Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default
* this parameter is set to false, which means that null tuples are not emitted.
@@ -417,6 +252,47 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs;
return this;
}
+
+ private Builder<K, V> withStringDeserializers() {
+ setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return this;
+ }
+
+ private Builder<K, V> setKafkaPropsForProcessingGuarantee() {
+ if (getKafkaProps().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ + " setting is not supported."
+ + " You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
+ }
+ String autoOffsetResetPolicy = (String) getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
+ if (autoOffsetResetPolicy == null) {
+ /*
+ * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is
+ * configured for at-least-once processing we should default to seeking to the earliest offset in case there's an offset
+ * out of range error, rather than seeking to the latest (Kafka's default). This type of error will typically happen
+ * when the consumer requests an offset that was deleted.
+ */
+ LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
+ LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
+ + " Some messages may be skipped.");
+ }
+ } else if (processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
+ if (autoOffsetResetPolicy != null
+ && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
+ LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
+ + " Some messages may be processed more than once.");
+ }
+ }
+ LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+ setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ return this;
+ }
public KafkaSpoutConfig<K, V> build() {
return new KafkaSpoutConfig<>(this);
@@ -431,7 +307,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* @return The new builder
*/
public static Builder<String, String> builder(String bootstrapServers, String... topics) {
- return setStringDeserializers(new Builder<>(bootstrapServers, topics));
+ return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
}
/**
@@ -442,7 +318,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* @return The new builder
*/
public static Builder<String, String> builder(String bootstrapServers, Set<String> topics) {
- return setStringDeserializers(new Builder<>(bootstrapServers, topics));
+ return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
}
/**
@@ -453,71 +329,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
* @return The new builder
*/
public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
- return setStringDeserializers(new Builder<>(bootstrapServers, topics));
- }
-
- private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) {
- builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return builder;
- }
-
- private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
- if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
- throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
- + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee");
- }
- String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
- if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
- if (autoOffsetResetPolicy == null) {
- /*
- * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
- * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
- * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
- * requests an offset that was deleted.
- */
- LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
- ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
- builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
- LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
- + " Some messages may be skipped.");
- }
- } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
- if (autoOffsetResetPolicy != null
- && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
- LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
- + " Some messages may be processed more than once.");
- }
- }
- LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
- ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
- builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- }
-
- /**
- * Gets the properties that will be passed to the KafkaConsumer.
- *
- * @return The Kafka properties map
- */
- public Map<String, Object> getKafkaProps() {
- return kafkaProps;
- }
-
- public TopicFilter getTopicFilter() {
- return topicFilter;
- }
-
- public ManualPartitioner getTopicPartitioner() {
- return topicPartitioner;
- }
-
- public RecordTranslator<K, V> getTranslator() {
- return translator;
- }
-
- public long getPollTimeoutMs() {
- return pollTimeoutMs;
+ return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
}
public long getOffsetsCommitPeriodMs() {
@@ -533,11 +345,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
public String getConsumerGroupId() {
- return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG);
- }
-
- public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
- return firstPollOffsetStrategy;
+ return (String) getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG);
}
public int getMaxUncommittedOffsets() {
@@ -552,10 +360,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return tupleListener;
}
- public long getPartitionRefreshPeriodMs() {
- return partitionRefreshPeriodMs;
- }
-
public boolean isEmitNullTuples() {
return emitNullTuples;
}
@@ -566,19 +370,15 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
@Override
public String toString() {
- return "KafkaSpoutConfig{"
- + "kafkaProps=" + kafkaProps
- + ", pollTimeoutMs=" + pollTimeoutMs
- + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
- + ", maxUncommittedOffsets=" + maxUncommittedOffsets
- + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
- + ", topicFilter=" + topicFilter
- + ", topicPartitioner=" + topicPartitioner
- + ", translator=" + translator
- + ", retryService=" + retryService
- + ", tupleListener=" + tupleListener
- + ", processingGuarantee=" + processingGuarantee
- + ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs
- + '}';
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("offsetCommitPeriodMs", offsetCommitPeriodMs)
+ .append("maxUncommittedOffsets", maxUncommittedOffsets)
+ .append("retryService", retryService)
+ .append("tupleListener", tupleListener)
+ .append("processingGuarantee", processingGuarantee)
+ .append("emitNullTuples", emitNullTuples)
+ .append("tupleTrackingEnforced", tupleTrackingEnforced)
+ .append("metricsTimeBucketSizeInSecs", metricsTimeBucketSizeInSecs)
+ .toString();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
new file mode 100644
index 0000000..9210d09
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.DefaultRecordTranslator;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.Func;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.SimpleRecordTranslator;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
+import org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
+import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CommonKafkaSpoutConfig<K, V> implements Serializable {
+ // 200ms
+ public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
+ // 2s
+ public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
+
+ public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+
+ public static final Logger LOG = LoggerFactory.getLogger(CommonKafkaSpoutConfig.class);
+
+ // Kafka consumer configuration
+ private final Map<String, Object> kafkaProps;
+ private final TopicFilter topicFilter;
+ private final ManualPartitioner topicPartitioner;
+ private final long pollTimeoutMs;
+
+ // Kafka spout configuration
+ private final RecordTranslator<K, V> translator;
+ private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private final long partitionRefreshPeriodMs;
+
+ /**
+ * Creates a new CommonKafkaSpoutConfig using a Builder.
+ *
+ * @param builder The Builder to construct the CommonKafkaSpoutConfig from
+ */
+ public CommonKafkaSpoutConfig(Builder<K, V, ?> builder) {
+ this.kafkaProps = builder.kafkaProps;
+ this.topicFilter = builder.topicFilter;
+ this.topicPartitioner = builder.topicPartitioner;
+ this.translator = builder.translator;
+ this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+ this.pollTimeoutMs = builder.pollTimeoutMs;
+ this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+ }
+
+ public abstract static class Builder<K, V, T extends Builder<K, V, T>> {
+
+ private final Map<String, Object> kafkaProps;
+ private final TopicFilter topicFilter;
+ private final ManualPartitioner topicPartitioner;
+ private RecordTranslator<K, V> translator;
+ private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
+ private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
+ private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
+
+ public Builder(String bootstrapServers, String... topics) {
+ this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
+ }
+
+ public Builder(String bootstrapServers, Set<String> topics) {
+ this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
+ }
+
+ public Builder(String bootstrapServers, Pattern topics) {
+ this(bootstrapServers, new PatternTopicFilter(topics), new RoundRobinManualPartitioner());
+ }
+
+ /**
+ * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
+ *
+ * @param bootstrapServers The bootstrap servers the consumer will use
+ * @param topicFilter The topic filter defining which topics and partitions the spout will read
+ * @param topicPartitioner The topic partitioner defining which topics and partitions are assinged to each spout task
+ */
+ public Builder(String bootstrapServers, TopicFilter topicFilter, ManualPartitioner topicPartitioner) {
+ kafkaProps = new HashMap<>();
+ if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+ throw new IllegalArgumentException("bootstrap servers cannot be null");
+ }
+ kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ this.topicFilter = topicFilter;
+ this.topicPartitioner = topicPartitioner;
+ this.translator = new DefaultRecordTranslator<>();
+ }
+
+ /**
+ * Set a {@link KafkaConsumer} property.
+ */
+ public T setProp(String key, Object value) {
+ kafkaProps.put(key, value);
+ return (T)this;
+ }
+
+ /**
+ * Set multiple {@link KafkaConsumer} properties.
+ */
+ public T setProp(Map<String, Object> props) {
+ kafkaProps.putAll(props);
+ return (T)this;
+ }
+
+ /**
+ * Set multiple {@link KafkaConsumer} properties.
+ */
+ public T setProp(Properties props) {
+ props.forEach((key, value) -> {
+ if (key instanceof String) {
+ kafkaProps.put((String) key, value);
+ } else {
+ throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
+ }
+ });
+ return (T)this;
+ }
+
+ //Spout Settings
+ /**
+ * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
+ *
+ * @param pollTimeoutMs time in ms
+ */
+ public T setPollTimeoutMs(long pollTimeoutMs) {
+ this.pollTimeoutMs = pollTimeoutMs;
+ return (T)this;
+ }
+
+ /**
+ * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
+ * documentation in {@link FirstPollOffsetStrategy}
+ *
+ * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
+ */
+ public T setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+ this.firstPollOffsetStrategy = firstPollOffsetStrategy;
+ return (T)this;
+ }
+
+ public T setRecordTranslator(RecordTranslator<K, V> translator) {
+ this.translator = translator;
+ return (T)this;
+ }
+
+ /**
+ * Configure a translator with tuples to be emitted on the default stream.
+ *
+ * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+ * @param fields the names of the fields extracted
+ * @return this to be able to chain configuration
+ */
+ public T setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+ return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
+ }
+
+ /**
+ * Configure a translator with tuples to be emitted to a given stream.
+ *
+ * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+ * @param fields the names of the fields extracted
+ * @param stream the stream to emit the tuples on
+ * @return this to be able to chain configuration
+ */
+ public T setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+ return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
+ }
+
+ /**
+ * Sets partition refresh period in milliseconds. This is how often Kafka will be polled to check for new topics and/or new
+ * partitions.
+ *
+ * @param partitionRefreshPeriodMs time in milliseconds
+ * @return the builder (this)
+ */
+ public T setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) {
+ this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+ return (T)this;
+ }
+
+ protected Map<String, Object> getKafkaProps() {
+ return kafkaProps;
+ }
+
+ public abstract CommonKafkaSpoutConfig<K, V> build();
+ }
+
+ /**
+ * Gets the properties that will be passed to the KafkaConsumer.
+ *
+ * @return The Kafka properties map
+ */
+ public Map<String, Object> getKafkaProps() {
+ return kafkaProps;
+ }
+
+ public TopicFilter getTopicFilter() {
+ return topicFilter;
+ }
+
+ public ManualPartitioner getTopicPartitioner() {
+ return topicPartitioner;
+ }
+
+ public RecordTranslator<K, V> getTranslator() {
+ return translator;
+ }
+
+ public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
+ return firstPollOffsetStrategy;
+ }
+
+ public long getPollTimeoutMs() {
+ return pollTimeoutMs;
+ }
+
+ public long getPartitionRefreshPeriodMs() {
+ return partitionRefreshPeriodMs;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("kafkaProps", kafkaProps)
+ .append("partitionRefreshPeriodMs", partitionRefreshPeriodMs)
+ .append("pollTimeoutMs", pollTimeoutMs)
+ .append("topicFilter", topicFilter)
+ .append("topicPartitioner", topicPartitioner)
+ .append("translator", translator)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
index 5ca7080..469a5f6 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
@@ -17,12 +17,12 @@
package org.apache.storm.kafka.spout.internal;
import java.io.Serializable;
+import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
/**
* This is here to enable testing.
*/
public interface ConsumerFactory<K, V> extends Serializable {
- public Consumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
+ public Consumer<K,V> createConsumer(Map<String, Object> consumerProps);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
index c384376..382808b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
@@ -16,14 +16,14 @@
package org.apache.storm.kafka.spout.internal;
+import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
public class ConsumerFactoryDefault<K, V> implements ConsumerFactory<K, V> {
@Override
- public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
- return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
+ public KafkaConsumer<K, V> createConsumer(Map<String, Object> consumerProps) {
+ return new KafkaConsumer<>(consumerProps);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutConfig.java
new file mode 100644
index 0000000..f62040f
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutConfig.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.kafka.spout.internal.CommonKafkaSpoutConfig;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+
+/**
+ * Defines the required Kafka-related configuration for the Trident spouts.
+ */
+public class KafkaTridentSpoutConfig<K, V> extends CommonKafkaSpoutConfig<K, V> {
+
+ private static final long serialVersionUID = 1L;
+
+ public KafkaTridentSpoutConfig(Builder<K, V> builder) {
+ super(builder);
+ }
+
+ /**
+ * Factory method that creates a Builder with String key/value deserializers.
+ *
+ * @param bootstrapServers The bootstrap servers for the consumer
+ * @param topics The topics to subscribe to
+ * @return The new builder
+ */
+ public static Builder<String, String> builder(String bootstrapServers, String... topics) {
+ return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
+ }
+
+ /**
+ * Factory method that creates a Builder with String key/value deserializers.
+ *
+ * @param bootstrapServers The bootstrap servers for the consumer
+ * @param topics The topics to subscribe to
+ * @return The new builder
+ */
+ public static Builder<String, String> builder(String bootstrapServers, Set<String> topics) {
+ return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
+ }
+
+ /**
+ * Factory method that creates a Builder with String key/value deserializers.
+ *
+ * @param bootstrapServers The bootstrap servers for the consumer
+ * @param topics The topic pattern to subscribe to
+ * @return The new builder
+ */
+ public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
+ return new Builder<String, String>(bootstrapServers, topics).withStringDeserializers();
+ }
+
+ public static class Builder<K, V> extends CommonKafkaSpoutConfig.Builder<K, V, Builder<K, V>> {
+
+ public Builder(String bootstrapServers, String... topics) {
+ super(bootstrapServers, topics);
+ }
+
+ public Builder(String bootstrapServers, Set<String> topics) {
+ super(bootstrapServers, topics);
+ }
+
+ public Builder(String bootstrapServers, Pattern topics) {
+ super(bootstrapServers, topics);
+ }
+
+ public Builder(String bootstrapServers, TopicFilter topicFilter, ManualPartitioner topicPartitioner) {
+ super(bootstrapServers, topicFilter, topicPartitioner);
+ }
+
+ private Builder<K, V> withStringDeserializers() {
+ setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return this;
+ }
+
+ @Override
+ public KafkaTridentSpoutConfig<K, V> build() {
+ return new KafkaTridentSpoutConfig<>(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
index 4e46d4c..d8f097b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
@@ -44,7 +44,7 @@ public class KafkaTridentSpoutCoordinator<K,V> implements
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutCoordinator.class);
private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
- private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
private final Timer refreshAssignmentTimer;
private final Consumer<K, V> consumer;
@@ -54,14 +54,14 @@ public class KafkaTridentSpoutCoordinator<K,V> implements
* Creates a new coordinator based on the given spout config.
* @param kafkaSpoutConfig The spout config to use
*/
- public KafkaTridentSpoutCoordinator(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ public KafkaTridentSpoutCoordinator(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new ConsumerFactoryDefault<>());
}
- KafkaTridentSpoutCoordinator(KafkaSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> consumerFactory) {
+ KafkaTridentSpoutCoordinator(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> consumerFactory) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
this.refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
- this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig);
+ this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
LOG.debug("Created {}", this.toString());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 22f21c7..ed86136 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -18,10 +18,10 @@
package org.apache.storm.kafka.spout.trident;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
@@ -39,7 +39,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.RecordTranslator;
import org.apache.storm.kafka.spout.TopicPartitionComparator;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
@@ -58,14 +58,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
// Kafka
private final Consumer<K, V> consumer;
- private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
private final TopicAssigner topicAssigner;
// The first seek offset for each topic partition, i.e. the offset this spout instance started processing at.
private final Map<TopicPartition, Long> tpToFirstSeekOffset = new HashMap<>();
private final long pollTimeoutMs;
- private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private final FirstPollOffsetStrategy firstPollOffsetStrategy;
private final RecordTranslator<K, V> translator;
private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
private final TopologyContext topologyContext;
@@ -76,15 +76,15 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
* @param kafkaSpoutConfig The kafka spout config
* @param topologyContext The topology context
*/
- public KafkaTridentSpoutEmitter(KafkaSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext) {
+ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext) {
this(kafkaSpoutConfig, topologyContext, new ConsumerFactoryDefault<>(), new TopicAssigner());
}
@VisibleForTesting
- KafkaTridentSpoutEmitter(KafkaSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext,
+ KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext,
ConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
- this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig);
+ this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());
this.topologyContext = topologyContext;
this.translator = kafkaSpoutConfig.getTranslator();
this.topicAssigner = topicAssigner;
@@ -191,8 +191,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
}
private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() {
- return firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST
- || firstPollOffsetStrategy == KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+ return firstPollOffsetStrategy == FirstPollOffsetStrategy.EARLIEST
+ || firstPollOffsetStrategy == FirstPollOffsetStrategy.LATEST;
}
private void throwIfEmittingForUnassignedPartition(TopicPartition currBatchTp) {
@@ -347,14 +347,14 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
- kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
+ LOG.info("Partitions revoked. [consumer={}, topic-partitions={}]",
+ consumer, partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
- kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
+ LOG.info("Partitions reassignment. [consumer={}, topic-partitions={}]",
+ consumer, partitions);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index 420e6f1..e20c89b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -35,13 +35,13 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
- private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
private final OutputFieldsExtractor outputFieldsExtractor;
/**
* Creates a new opaque transactional Trident Kafka spout.
*/
- public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ public KafkaTridentSpoutOpaque(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
this.outputFieldsExtractor = new OutputFieldsExtractor();
LOG.debug("Created {}", this.toString());
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
index 2d1e9de..2eb3f29 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
@@ -30,13 +30,13 @@ public class KafkaTridentSpoutTransactional<K,V> implements IPartitionedTridentS
Serializable {
private static final long serialVersionUID = 1L;
- private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private final KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig;
private final OutputFieldsExtractor outputFieldsExtractor;
/**
* Creates a new non-opaque transactional Trident Kafka spout.
*/
- public KafkaTridentSpoutTransactional(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ public KafkaTridentSpoutTransactional(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
this.outputFieldsExtractor = new OutputFieldsExtractor();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java
index 93a2a05..e702fef 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/internal/OutputFieldsExtractor.java
@@ -17,8 +17,8 @@
package org.apache.storm.kafka.spout.trident.internal;
import java.io.Serializable;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.tuple.Fields;
public class OutputFieldsExtractor implements Serializable {
@@ -28,7 +28,7 @@ public class OutputFieldsExtractor implements Serializable {
* Extract the output fields from the config.
* Throws an error if there are multiple declared output streams, since Trident only supports one output stream per spout.
*/
- public <K, V> Fields getOutputFields(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ public <K, V> Fields getOutputFields(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig) {
RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
int numStreams = translator.streams().size();
if (numStreams > 1) {
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
index f51b159..b05f132 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -95,7 +95,7 @@ public abstract class KafkaSpoutAbstractTest {
return new ConsumerFactory<String, String>() {
@Override
- public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+ public KafkaConsumer<String, String> createConsumer(Map<String, Object> consumerProps) {
return consumerSpy;
}
@@ -103,7 +103,7 @@ public abstract class KafkaSpoutAbstractTest {
}
KafkaConsumer<String, String> createConsumerSpy() {
- return spy(new ConsumerFactoryDefault<String, String>().createConsumer(spoutConfig));
+ return spy(new ConsumerFactoryDefault<String, String>().createConsumer(spoutConfig.getKafkaProps()));
}
@AfterEach
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index e168f07..90a1f1b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -27,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index 27c7372..af9f221 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.KafkaUnitExtension;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
@@ -44,7 +43,6 @@ import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -78,7 +76,7 @@ public class KafkaSpoutReactivationTest {
.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
.build();
ConsumerFactory<String, String> consumerFactory = new ConsumerFactoryDefault<>();
- this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig));
+ this.consumerSpy = spy(consumerFactory.createConsumer(spoutConfig.getKafkaProps()));
ConsumerFactory<String, String> consumerFactoryMock = mock(ConsumerFactory.class);
when(consumerFactoryMock.createConsumer(any()))
.thenReturn(consumerSpy);
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 5740b3f..111b4c3 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -208,7 +208,7 @@ public class KafkaSpoutRebalanceTest {
TopicAssigner assignerMock = mock(TopicAssigner.class);
KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1)
- .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
+ .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.build(), consumerFactory, assignerMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
TopicPartition assignedPartition = new TopicPartition(topic, 1);
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
index f45aaec..3c3c106 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyDeployActivateDeactivateTest.java
@@ -36,7 +36,7 @@ public class KafkaSpoutTopologyDeployActivateDeactivateTest extends KafkaSpoutAb
KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC)))
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
- .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
+ .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
.build();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index d7febc0..547a27e 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -82,7 +82,7 @@ public class MaxUncommittedOffsetTest {
//The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets.
assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets));
spout = new KafkaSpout<>(spoutConfig);
- new ConsumerFactoryDefault<String, String>().createConsumer(spoutConfig);
+ new ConsumerFactoryDefault<String, String>().createConsumer(spoutConfig.getKafkaProps());
}
private void prepareSpout(int msgCount) throws Exception {
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
index f7e0a96..7c2e588 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
@@ -17,8 +17,8 @@
*/
package org.apache.storm.kafka.spout.config.builder;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
index 7f95b74..0f419be 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
@@ -16,8 +16,6 @@
package org.apache.storm.kafka.spout.trident;
-import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata;
-
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;