You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/07/19 18:25:05 UTC
[2/4] storm git commit: STORM-2542: Remove storm-kafka-client
KafkaConsumer.subscribe API option
STORM-2542: Remove storm-kafka-client KafkaConsumer.subscribe API option
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fdb649e3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fdb649e3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fdb649e3
Branch: refs/heads/master
Commit: fdb649e352e05fd849cafa312bbd62fc75694579
Parents: cd6ca3e
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Mon Jun 5 14:59:19 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jul 19 00:18:03 2017 +0200
----------------------------------------------------------------------
docs/storm-kafka-client.md | 7 +-
.../storm/kafka/spout/KafkaSpoutConfig.java | 175 ++++++++++---------
.../spout/ManualPartitionSubscription.java | 71 --------
.../storm/kafka/spout/ManualPartitioner.java | 40 -----
.../storm/kafka/spout/NamedSubscription.java | 61 -------
.../storm/kafka/spout/NamedTopicFilter.java | 67 -------
.../storm/kafka/spout/PatternSubscription.java | 54 ------
.../storm/kafka/spout/PatternTopicFilter.java | 69 --------
.../spout/RoundRobinManualPartitioner.java | 50 ------
.../apache/storm/kafka/spout/Subscription.java | 53 ------
.../apache/storm/kafka/spout/TopicFilter.java | 38 ----
.../ManualPartitionSubscription.java | 72 ++++++++
.../spout/subscription/ManualPartitioner.java | 40 +++++
.../spout/subscription/NamedTopicFilter.java | 67 +++++++
.../spout/subscription/PatternTopicFilter.java | 69 ++++++++
.../RoundRobinManualPartitioner.java | 50 ++++++
.../kafka/spout/subscription/Subscription.java | 53 ++++++
.../kafka/spout/subscription/TopicFilter.java | 38 ++++
.../storm/kafka/spout/KafkaSpoutCommitTest.java | 36 ++--
.../storm/kafka/spout/KafkaSpoutEmitTest.java | 48 ++---
.../kafka/spout/KafkaSpoutRebalanceTest.java | 37 ++--
.../kafka/spout/KafkaSpoutRetryLimitTest.java | 74 ++++----
.../kafka/spout/MaxUncommittedOffsetTest.java | 7 +-
.../storm/kafka/spout/NamedTopicFilterTest.java | 69 --------
.../kafka/spout/PatternTopicFilterTest.java | 73 --------
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 9 +-
.../SpoutWithMockedConsumerSetupHelper.java | 74 ++++++++
.../SingleTopicKafkaSpoutConfiguration.java | 44 +++--
.../subscription/NamedTopicFilterTest.java | 68 +++++++
.../subscription/PatternTopicFilterTest.java | 73 ++++++++
30 files changed, 827 insertions(+), 859 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index ada8619..99b9ae5 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -240,12 +240,9 @@ streams. If you are doing this for Trident a value must be in the List returned
otherwise trident can throw exceptions.
-### Manual Partition Control (ADVANCED)
+### Manual Partition Assigment (ADVANCED)
-By default Kafka will automatically assign partitions to the current set of spouts. It handles lots of things, but in some cases you may want to manually assign the partitions.
-This can cause less churn in the assignments when spouts go down and come back up, but it can result in a lot of issues if not done right. This can all be handled by subclassing
-Subscription and we have a few implementations that you can look at for examples on how to do this. ManualPartitionNamedSubscription and ManualPartitionPatternSubscription. Again
-please be careful when using these or implementing your own.
+By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality.
## Use the Maven Shade Plugin to Build the Uber Jar
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 6f09f5f..72fa52e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -24,39 +24,41 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription;
+import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
+import org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
+import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.Subscription;
import org.apache.storm.tuple.Fields;
/**
* KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics.
*/
public class KafkaSpoutConfig<K, V> implements Serializable {
+
private static final long serialVersionUID = 141902646130682494L;
// 200ms
- public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
+ public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
// 30s
- public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
+ public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
// Retry forever
- public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
+ public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
// 10,000,000 records => 80MBs of memory footprint in the worst case
- public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
+ public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
// 2s
- public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
+ public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
- public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
- new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
- DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
- /**
- * Retry in a tight loop (keep unit tests fasts) do not use in production.
- */
- public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
- new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
- DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));
-
+ public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
+ new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+ DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+
// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
private final Subscription subscription;
@@ -73,9 +75,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
/**
* Creates a new KafkaSpoutConfig using a Builder.
+ *
* @param builder The Builder to construct the KafkaSpoutConfig from
*/
- public KafkaSpoutConfig(Builder<K,V> builder) {
+ public KafkaSpoutConfig(Builder<K, V> builder) {
this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
this.subscription = builder.subscription;
this.translator = builder.translator;
@@ -108,12 +111,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
EARLIEST,
LATEST,
UNCOMMITTED_EARLIEST,
- UNCOMMITTED_LATEST
+ UNCOMMITTED_LATEST
}
-
- public static class Builder<K,V> {
+
+ public static class Builder<K, V> {
+
private final Map<String, Object> kafkaProps;
- private Subscription subscription;
+ private final Subscription subscription;
private RecordTranslator<K, V> translator;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
@@ -123,20 +127,22 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
private boolean emitNullTuples = false;
- public Builder(String bootstrapServers, String ... topics) {
- this(bootstrapServers, new NamedSubscription(topics));
+ public Builder(String bootstrapServers, String... topics) {
+ this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
}
-
- public Builder(String bootstrapServers, Collection<String> topics) {
- this(bootstrapServers, new NamedSubscription(topics));
+
+ public Builder(String bootstrapServers, Set<String> topics) {
+ this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(),
+ new NamedTopicFilter(topics)));
}
-
+
public Builder(String bootstrapServers, Pattern topics) {
- this(bootstrapServers, new PatternSubscription(topics));
+ this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
}
-
+
/**
* Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
+ *
* @param bootstrapServers The bootstrap servers the consumer will use
* @param subscription The subscription defining which topics and partitions each spout instance will read.
*/
@@ -149,30 +155,30 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.subscription = subscription;
this.translator = new DefaultRecordTranslator<>();
}
-
+
/**
- * Set a {@link KafkaConsumer} property.
+ * Set a {@link KafkaConsumer} property.
*/
- public Builder<K,V> setProp(String key, Object value) {
+ public Builder<K, V> setProp(String key, Object value) {
kafkaProps.put(key, value);
return this;
}
-
+
/**
* Set multiple {@link KafkaConsumer} properties.
*/
- public Builder<K,V> setProp(Map<String, Object> props) {
+ public Builder<K, V> setProp(Map<String, Object> props) {
kafkaProps.putAll(props);
return this;
}
-
+
/**
* Set multiple {@link KafkaConsumer} properties.
*/
- public Builder<K,V> setProp(Properties props) {
+ public Builder<K, V> setProp(Properties props) {
props.forEach((key, value) -> {
if (key instanceof String) {
- kafkaProps.put((String)key, value);
+ kafkaProps.put((String) key, value);
} else {
throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
}
@@ -183,46 +189,51 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
//Spout Settings
/**
* Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
+ *
* @param pollTimeoutMs time in ms
*/
- public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
+ public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) {
this.pollTimeoutMs = pollTimeoutMs;
return this;
}
/**
* Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
+ *
* @param offsetCommitPeriodMs time in ms
*/
- public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
+ public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
this.offsetCommitPeriodMs = offsetCommitPeriodMs;
return this;
}
/**
- * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
- * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
- * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
- * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1.
+ * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. Once this
+ * limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number of pending offsets
+ * below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be exceeded,
+ * but no partition will exceed this limit by more than maxPollRecords - 1.
+ *
* @param maxUncommittedOffsets max number of records that can be be pending commit
*/
- public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
+ public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
this.maxUncommittedOffsets = maxUncommittedOffsets;
return this;
}
/**
- * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start.
- * Please refer to to the documentation in {@link FirstPollOffsetStrategy}
+ * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
+ * documentation in {@link FirstPollOffsetStrategy}
+ *
* @param firstPollOffsetStrategy Offset used by Kafka spout first poll
- * */
+ */
public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
this.firstPollOffsetStrategy = firstPollOffsetStrategy;
return this;
}
-
+
/**
* Sets the retry service for the spout to use.
+ *
* @param retryService the new retry service
* @return the builder (this).
*/
@@ -238,9 +249,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.translator = translator;
return this;
}
-
+
/**
* Configure a translator with tuples to be emitted on the default stream.
+ *
* @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
* @param fields the names of the fields extracted
* @return this to be able to chain configuration
@@ -248,9 +260,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
}
-
+
/**
* Configure a translator with tuples to be emitted to a given stream.
+ *
* @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
* @param fields the names of the fields extracted
* @param stream the stream to emit the tuples on
@@ -259,12 +272,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
}
-
+
/**
- * Sets partition refresh period in milliseconds. This is how often kafka will be polled
- * to check for new topics and/or new partitions.
- * This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
+ * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new
+ * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
* PatternSubscription rely on kafka to handle this instead.
+ *
* @param partitionRefreshPeriodMs time in milliseconds
* @return the builder (this)
*/
@@ -274,8 +287,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
/**
- * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly
- * ack them. By default this parameter is set to false, which means that null tuples are not emitted.
+ * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default
+ * this parameter is set to false, which means that null tuples are not emitted.
+ *
* @param emitNullTuples sets if null tuples should or not be emitted downstream
*/
public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) {
@@ -283,34 +297,36 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return this;
}
- public KafkaSpoutConfig<K,V> build() {
+ public KafkaSpoutConfig<K, V> build() {
return new KafkaSpoutConfig<>(this);
}
}
-
-
+
/**
* Factory method that creates a Builder with String key/value deserializers.
+ *
* @param bootstrapServers The bootstrap servers for the consumer
* @param topics The topics to subscribe to
* @return The new builder
*/
- public static Builder<String, String> builder(String bootstrapServers, String ... topics) {
+ public static Builder<String, String> builder(String bootstrapServers, String... topics) {
return setStringDeserializers(new Builder<>(bootstrapServers, topics));
}
-
+
/**
* Factory method that creates a Builder with String key/value deserializers.
+ *
* @param bootstrapServers The bootstrap servers for the consumer
* @param topics The topics to subscribe to
* @return The new builder
*/
- public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
+ public static Builder<String, String> builder(String bootstrapServers, Set<String> topics) {
return setStringDeserializers(new Builder<>(bootstrapServers, topics));
}
-
+
/**
* Factory method that creates a Builder with String key/value deserializers.
+ *
* @param bootstrapServers The bootstrap servers for the consumer
* @param topics The topic pattern to subscribe to
* @return The new builder
@@ -318,13 +334,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
return setStringDeserializers(new Builder<>(bootstrapServers, topics));
}
-
+
private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) {
builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return builder;
}
-
+
private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
// set defaults for properties not specified
if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
@@ -335,17 +351,18 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
/**
* Gets the properties that will be passed to the KafkaConsumer.
+ *
* @return The Kafka properties map
*/
public Map<String, Object> getKafkaProps() {
return kafkaProps;
}
-
+
public Subscription getSubscription() {
return subscription;
}
-
- public RecordTranslator<K,V> getTranslator() {
+
+ public RecordTranslator<K, V> getTranslator() {
return translator;
}
@@ -358,8 +375,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
public boolean isConsumerAutoCommitMode() {
- return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false
- || Boolean.valueOf((String)kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+ return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false
+ || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
public String getConsumerGroupId() {
@@ -377,7 +394,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public KafkaSpoutRetryService getRetryService() {
return retryService;
}
-
+
public long getPartitionRefreshPeriodMs() {
return partitionRefreshPeriodMs;
}
@@ -389,14 +406,14 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
@Override
public String toString() {
return "KafkaSpoutConfig{"
- + "kafkaProps=" + kafkaProps
- + ", pollTimeoutMs=" + pollTimeoutMs
- + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
- + ", maxUncommittedOffsets=" + maxUncommittedOffsets
- + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
- + ", subscription=" + subscription
- + ", translator=" + translator
- + ", retryService=" + retryService
- + '}';
+ + "kafkaProps=" + kafkaProps
+ + ", pollTimeoutMs=" + pollTimeoutMs
+ + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
+ + ", maxUncommittedOffsets=" + maxUncommittedOffsets
+ + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
+ + ", subscription=" + subscription
+ + ", translator=" + translator
+ + ", retryService=" + retryService
+ + '}';
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
deleted file mode 100644
index 2c65d6d..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-public class ManualPartitionSubscription extends Subscription {
- private static final long serialVersionUID = 5633018073527583826L;
- private final ManualPartitioner partitioner;
- private final TopicFilter partitionFilter;
- private Set<TopicPartition> currentAssignment = null;
- private KafkaConsumer<?, ?> consumer = null;
- private ConsumerRebalanceListener listener = null;
- private TopologyContext context = null;
-
- public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) {
- this.partitionFilter = partitionFilter;
- this.partitioner = parter;
- }
-
- @Override
- public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
- this.consumer = consumer;
- this.listener = listener;
- this.context = context;
- refreshAssignment();
- }
-
- @Override
- public void refreshAssignment() {
- List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
- Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
- Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
- if (!newAssignment.equals(currentAssignment)) {
- consumer.assign(newAssignment);
- if (currentAssignment != null) {
- listener.onPartitionsRevoked(currentAssignment);
- }
- currentAssignment = newAssignment;
- listener.onPartitionsAssigned(newAssignment);
- }
- }
-
- @Override
- public String getTopicsString() {
- return partitionFilter.getTopicsString();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
deleted file mode 100644
index 4856687..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.List;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * A function used to assign partitions to this spout.
- * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
- * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total
- * number of spouts to avoid missing partitions or double assigning partitions.
- */
-@FunctionalInterface
-public interface ManualPartitioner {
- /**
- * Get the partitions for this assignment
- * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
- * @param context the context of the topology
- * @return the subset of the partitions that this spout should use.
- */
- public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
deleted file mode 100644
index 0eb48cb..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Subscribe to all topics that follow a given list of values.
- */
-public class NamedSubscription extends Subscription {
- private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class);
- private static final long serialVersionUID = 3438543305215813839L;
- protected final Collection<String> topics;
-
- public NamedSubscription(Collection<String> topics) {
- this.topics = Collections.unmodifiableCollection(new ArrayList<>(topics));
- }
-
- public NamedSubscription(String ... topics) {
- this(Arrays.asList(topics));
- }
-
- @Override
- public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
- consumer.subscribe(topics, listener);
- LOG.info("Kafka consumer subscribed topics {}", topics);
-
- // Initial poll to get the consumer registration process going.
- // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
- consumer.poll(0);
- }
-
- @Override
- public String getTopicsString() {
- return String.join(",", topics);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
deleted file mode 100644
index 982828d..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Filter that returns all partitions for the specified topics.
- */
-public class NamedTopicFilter implements TopicFilter {
-
- private final Set<String> topics;
-
- /**
- * Create filter based on a set of topic names.
- * @param topics The topic names the filter will pass.
- */
- public NamedTopicFilter(Set<String> topics) {
- this.topics = Collections.unmodifiableSet(topics);
- }
-
- /**
- * Convenience constructor.
- * @param topics The topic names the filter will pass.
- */
- public NamedTopicFilter(String... topics) {
- this(new HashSet<>(Arrays.asList(topics)));
- }
-
- @Override
- public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
- List<TopicPartition> allPartitions = new ArrayList<>();
- for (String topic : topics) {
- for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
- allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
- }
- }
- return allPartitions;
- }
-
- @Override
- public String getTopicsString() {
- return String.join(",", topics);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
deleted file mode 100644
index ec53f01..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Subscribe to all topics that match a given pattern.
- */
-public class PatternSubscription extends Subscription {
- private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class);
- private static final long serialVersionUID = 3438543305215813839L;
- protected final Pattern pattern;
-
- public PatternSubscription(Pattern pattern) {
- this.pattern = pattern;
- }
-
- @Override
- public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
- consumer.subscribe(pattern, listener);
- LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
-
- // Initial poll to get the consumer registration process going.
- // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
- consumer.poll(0);
- }
-
- @Override
- public String getTopicsString() {
- return pattern.pattern();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
deleted file mode 100644
index 2964874..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Filter that returns all partitions for topics matching the given {@link Pattern}.
- */
-public class PatternTopicFilter implements TopicFilter {
-
- private final Pattern pattern;
- private final Set<String> topics = new HashSet<>();
-
- /**
- * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter.
- *
- * @param pattern The Pattern to use.
- */
- public PatternTopicFilter(Pattern pattern) {
- this.pattern = pattern;
- }
-
- @Override
- public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
- topics.clear();
- List<TopicPartition> allPartitions = new ArrayList<>();
- for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
- if (pattern.matcher(entry.getKey()).matches()) {
- for (PartitionInfo partitionInfo : entry.getValue()) {
- allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
- topics.add(partitionInfo.topic());
- }
- }
- }
- return allPartitions;
- }
-
- @Override
- public String getTopicsString() {
- return String.join(",", topics);
- }
-
- public String getTopicsPattern() {
- return pattern.pattern();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
deleted file mode 100644
index 4afcc49..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * Assign partitions in a round robin fashion for all spouts,
- * not just the ones that are alive. Because the parallelism of
- * the spouts does not typically change while running this makes
- * the assignments more stable in the face of crashing spouts.
- * <p/>
- * Round Robin means that first spout of N spouts will get the first
- * partition, and the N+1th partition... The second spout will get the second partition and
- * N+2th partition etc.
- */
-public class RoundRobinManualPartitioner implements ManualPartitioner {
-
- @Override
- public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
- int thisTaskIndex = context.getThisTaskIndex();
- int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
- Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1);
- for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
- myPartitions.add(allPartitions.get(i));
- }
- return new ArrayList<>(myPartitions);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
deleted file mode 100644
index 9c5a8c4..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.io.Serializable;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * A subscription to kafka.
- */
-public abstract class Subscription implements Serializable {
- private static final long serialVersionUID = -216136367240198716L;
-
- /**
- * Subscribe the KafkaConsumer to the proper topics
- * @param consumer the Consumer to get.
- * @param listener the rebalance listener to include in the subscription
- */
- public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
-
- /**
- * @return A human-readable string representing the subscribed topics.
- */
- public abstract String getTopicsString();
-
- /**
- * NOOP is the default behavior, which means that Kafka will internally handle partition assignment.
- * If you wish to do manual partition management, you must provide an implementation of this method
- * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe
- * to inform the rest of the system of those changes.
- */
- public void refreshAssignment() {
- //NOOP
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
deleted file mode 100644
index 7631c8a..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-public interface TopicFilter extends Serializable {
-
- /**
- * Get the Kafka TopicPartitions passed by this filter.
- * @param consumer The Kafka consumer to use to read the list of existing partitions
- * @return The Kafka partitions passed by this filter.
- */
- List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer);
-
- /**
- * @return A human-readable string representing the topics that pass the filter.
- */
- String getTopicsString();
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
new file mode 100644
index 0000000..17512ea
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.TopicPartitionComparator;
+import org.apache.storm.task.TopologyContext;
+
+public class ManualPartitionSubscription extends Subscription {
+ private static final long serialVersionUID = 5633018073527583826L;
+ private final ManualPartitioner partitioner;
+ private final TopicFilter partitionFilter;
+ private Set<TopicPartition> currentAssignment = null;
+ private KafkaConsumer<?, ?> consumer = null;
+ private ConsumerRebalanceListener listener = null;
+ private TopologyContext context = null;
+
+ public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) {
+ this.partitionFilter = partitionFilter;
+ this.partitioner = parter;
+ }
+
+ @Override
+ public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
+ this.consumer = consumer;
+ this.listener = listener;
+ this.context = context;
+ refreshAssignment();
+ }
+
+ @Override
+ public void refreshAssignment() {
+ List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
+ Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
+ Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
+ if (!newAssignment.equals(currentAssignment)) {
+ consumer.assign(newAssignment);
+ if (currentAssignment != null) {
+ listener.onPartitionsRevoked(currentAssignment);
+ }
+ currentAssignment = newAssignment;
+ listener.onPartitionsAssigned(newAssignment);
+ }
+ }
+
+ @Override
+ public String getTopicsString() {
+ return partitionFilter.getTopicsString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
new file mode 100644
index 0000000..dce7fc6
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.List;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A function used to assign partitions to this spout.
+ * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
+ * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total
+ * number of spouts to avoid missing partitions or double assigning partitions.
+ */
+@FunctionalInterface
+public interface ManualPartitioner {
+ /**
+ * Get the partitions for this assignment
+ * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
+ * @param context the context of the topology
+ * @return the subset of the partitions that this spout should use.
+ */
+ public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
new file mode 100644
index 0000000..d6e5fc2
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for the specified topics.
+ */
+public class NamedTopicFilter implements TopicFilter {
+
+ private final Set<String> topics;
+
+ /**
+ * Create filter based on a set of topic names.
+ * @param topics The topic names the filter will pass.
+ */
+ public NamedTopicFilter(Set<String> topics) {
+ this.topics = Collections.unmodifiableSet(topics);
+ }
+
+ /**
+ * Convenience constructor.
+ * @param topics The topic names the filter will pass.
+ */
+ public NamedTopicFilter(String... topics) {
+ this(new HashSet<>(Arrays.asList(topics)));
+ }
+
+ @Override
+ public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+ List<TopicPartition> allPartitions = new ArrayList<>();
+ for (String topic : topics) {
+ for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
+ allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ }
+ return allPartitions;
+ }
+
+ @Override
+ public String getTopicsString() {
+ return String.join(",", topics);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
new file mode 100644
index 0000000..98f8b23
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for topics matching the given {@link Pattern}.
+ */
+public class PatternTopicFilter implements TopicFilter {
+
+ private final Pattern pattern;
+ private final Set<String> topics = new HashSet<>();
+
+ /**
+ * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter.
+ *
+ * @param pattern The Pattern to use.
+ */
+ public PatternTopicFilter(Pattern pattern) {
+ this.pattern = pattern;
+ }
+
+ @Override
+ public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+ topics.clear();
+ List<TopicPartition> allPartitions = new ArrayList<>();
+ for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
+ if (pattern.matcher(entry.getKey()).matches()) {
+ for (PartitionInfo partitionInfo : entry.getValue()) {
+ allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ topics.add(partitionInfo.topic());
+ }
+ }
+ }
+ return allPartitions;
+ }
+
+ @Override
+ public String getTopicsString() {
+ return String.join(",", topics);
+ }
+
+ public String getTopicsPattern() {
+ return pattern.pattern();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
new file mode 100644
index 0000000..9660c77
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Assign partitions in a round robin fashion for all spouts,
+ * not just the ones that are alive. Because the parallelism of
+ * the spouts does not typically change while running this makes
+ * the assignments more stable in the face of crashing spouts.
+ * <p/>
+ * Round Robin means that first spout of N spouts will get the first
+ * partition, and the N+1th partition... The second spout will get the second partition and
+ * N+2th partition etc.
+ */
+public class RoundRobinManualPartitioner implements ManualPartitioner {
+
+ @Override
+ public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
+ int thisTaskIndex = context.getThisTaskIndex();
+ int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
+ Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1);
+ for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
+ myPartitions.add(allPartitions.get(i));
+ }
+ return new ArrayList<>(myPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
new file mode 100644
index 0000000..8091bfa
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+ private static final long serialVersionUID = -216136367240198716L;
+
+ /**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the subscription
+ */
+ public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
+
+ /**
+ * @return A human-readable string representing the subscribed topics.
+ */
+ public abstract String getTopicsString();
+
+ /**
+ * NOOP is the default behavior, which means that Kafka will internally handle partition assignment.
+ * If you wish to do manual partition management, you must provide an implementation of this method
+ * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe
+ * to inform the rest of the system of those changes.
+ */
+ public void refreshAssignment() {
+ //NOOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
new file mode 100644
index 0000000..497e3ca
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+public interface TopicFilter extends Serializable {
+
+ /**
+ * Get the Kafka TopicPartitions passed by this filter.
+ * @param consumer The Kafka consumer to use to read the list of existing partitions
+ * @return The Kafka partitions passed by this filter.
+ */
+ List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer);
+
+ /**
+ * @return A human-readable string representing the topics that pass the filter.
+ */
+ String getTopicsString();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
index 8dc34d4..7258fe2 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -26,16 +26,15 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
@@ -50,53 +49,38 @@ public class KafkaSpoutCommitTest {
private final Map<String, Object> conf = new HashMap<>();
private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
private KafkaConsumer<String, String> consumerMock;
- private KafkaSpout<String, String> spout;
- private KafkaSpoutConfig spoutConfig;
+ private KafkaSpoutConfig<String, String> spoutConfig;
@Captor
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
- private void setupSpout(Set<TopicPartition> assignedPartitions) {
+ @Before
+ public void setUp() {
MockitoAnnotations.initMocks(this);
spoutConfig = getKafkaSpoutConfigBuilder(-1)
- .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
- .build();
-
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .build();
consumerMock = mock(KafkaConsumer.class);
- KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-
- //Set up a spout listening to 1 topic partition
- spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
- spout.open(conf, contextMock, collectorMock);
- spout.activate();
-
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
- //Assign partitions to the spout
- ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
- consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
}
@Test
public void testCommitSuccessWithOffsetVoids() {
//Verify that the commit logic can handle offset voids
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
// Offsets emitted are 0,1,2,3,4,<void>,8,9
for (int i = 0; i < 5; i++) {
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
for (int i = 8; i < 10; i++) {
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
records.put(partition, recordsForPartition);
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
+ .thenReturn(new ConsumerRecords<>(records));
for (int i = 0; i < recordsForPartition.size(); i++) {
spout.nextTuple();
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 24a2eda..8e6d390 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -16,7 +16,6 @@
package org.apache.storm.kafka.spout;
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
-import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.inOrder;
@@ -32,18 +31,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
@@ -56,45 +53,30 @@ public class KafkaSpoutEmitTest {
private final Map<String, Object> conf = new HashMap<>();
private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
private KafkaConsumer<String, String> consumerMock;
- private KafkaSpout<String, String> spout;
- private KafkaSpoutConfig spoutConfig;
+ private KafkaSpoutConfig<String, String> spoutConfig;
- private void setupSpout(Set<TopicPartition> assignedPartitions) {
+ @Before
+ public void setUp() {
spoutConfig = getKafkaSpoutConfigBuilder(-1)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.build();
-
consumerMock = mock(KafkaConsumer.class);
- KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-
- //Set up a spout listening to 1 topic partition
- spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
- spout.open(conf, contextMock, collectorMock);
- spout.activate();
-
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
- //Assign partitions to the spout
- ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
- consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
}
@Test
public void testNextTupleEmitsAtMostOneTuple() {
//The spout should emit at most one message per call to nextTuple
//This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
records.put(partition, recordsForPartition);
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
+ .thenReturn(new ConsumerRecords<>(records));
spout.nextTuple();
@@ -107,17 +89,17 @@ public class KafkaSpoutEmitTest {
//Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
//This is cheating a bit since maxPollRecords would normally spread this across multiple polls
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
records.put(partition, recordsForPartition);
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
+ .thenReturn(new ConsumerRecords<>(records));
for (int i = 0; i < recordsForPartition.size(); i++) {
spout.nextTuple();
@@ -172,13 +154,13 @@ public class KafkaSpoutEmitTest {
//Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPollRecords = new HashMap<>();
List<ConsumerRecord<String, String>> firstPollRecordsForPartition = new ArrayList<>();
for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
//This is cheating a bit since maxPollRecords would normally spread this across multiple polls
- firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ firstPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
firstPollRecords.put(partition, firstPollRecordsForPartition);
@@ -186,13 +168,13 @@ public class KafkaSpoutEmitTest {
Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPollRecords = new HashMap<>();
List<ConsumerRecord<String, String>> secondPollRecordsForPartition = new ArrayList<>();
for(int i = 0; i < maxPollRecords; i++) {
- secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
+ secondPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
}
secondPollRecords.put(partition, secondPollRecordsForPartition);
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(firstPollRecords))
- .thenReturn(new ConsumerRecords(secondPollRecords));
+ .thenReturn(new ConsumerRecords<>(firstPollRecords))
+ .thenReturn(new ConsumerRecords<>(secondPollRecords));
for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) {
spout.nextTuple();