You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/08 02:11:36 UTC
[1/3] storm git commit: STORM-2281: Running Multiple Kafka Spouts
(Trident) Throws Illegal State Exception
Repository: storm
Updated Branches:
refs/heads/master d3250b22f -> 3e232e2fc
STORM-2281: Running Multiple Kafka Spouts (Trident) Throws Illegal State Exception
- Assign topic partitions to tasks running the instance of Kafka consumer that has assigned the same list of topic partitions
- Improve logging
- Minor code refactoring
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07cf86e9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07cf86e9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07cf86e9
Branch: refs/heads/master
Commit: 07cf86e95a66e47cac41f67f5b269ab24758d8cb
Parents: d3250b2
Author: Hugo Louro <hm...@gmail.com>
Authored: Mon Jan 30 00:02:42 2017 -0800
Committer: Hugo Louro <hm...@gmail.com>
Committed: Tue Feb 7 15:43:45 2017 -0800
----------------------------------------------------------------------
.../TridentKafkaClientWordCountNamedTopics.java | 44 ++--
.../storm/kafka/trident/LocalSubmitter.java | 7 +-
.../trident/TridentKafkaConsumerTopology.java | 20 +-
.../trident/KafkaTridentSpoutBatchMetadata.java | 8 +-
.../spout/trident/KafkaTridentSpoutEmitter.java | 206 +++++++++++++------
.../spout/trident/KafkaTridentSpoutManager.java | 85 ++++----
.../spout/trident/KafkaTridentSpoutOpaque.java | 17 +-
.../KafkaTridentSpoutOpaqueCoordinator.java | 4 +-
.../KafkaTridentSpoutTopicPartition.java | 2 +-
...KafkaTridentSpoutTopicPartitionRegistry.java | 4 +-
10 files changed, 237 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index 2d08f6c..edd1f09 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -18,16 +18,13 @@
package org.apache.storm.kafka.trident;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.kafka.spout.Func;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
@@ -36,6 +33,13 @@ import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
public class TridentKafkaClientWordCountNamedTopics {
private static final String TOPIC_1 = "test-trident";
private static final String TOPIC_2 = "test-trident-1";
@@ -45,11 +49,23 @@ public class TridentKafkaClientWordCountNamedTopics {
return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
}
+ private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc();
+
+ /**
+ * Needs to be serializable
+ */
+ private static class JustValueFunc implements Func<ConsumerRecord<String, String>, List<Object>>, Serializable {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return new Values(record.value());
+ }
+ }
+
protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, TOPIC_2)
- .setGroupId("kafkaSpoutTestGroup")
+ .setGroupId("kafkaSpoutTestGroup_" + System.nanoTime())
.setMaxPartitionFectchBytes(200)
- .setRecordTranslator((r) -> new Values(r.value()), new Fields("str"))
+ .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
.setRetry(newRetryService())
.setOffsetCommitPeriodMs(10_000)
.setFirstPollOffsetStrategy(EARLIEST)
@@ -77,7 +93,7 @@ public class TridentKafkaClientWordCountNamedTopics {
System.out.printf("Running with broker_url: [%s], topics: [%s, %s]\n", brokerUrl, topic1, topic2);
- Config tpConf = LocalSubmitter.defaultConfig();
+ Config tpConf = LocalSubmitter.defaultConfig(true);
if (args.length == 4) { //Submit Remote
// Producers
@@ -102,11 +118,15 @@ public class TridentKafkaClientWordCountNamedTopics {
localSubmitter.submit(topic1Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic1));
localSubmitter.submit(topic2Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2));
// Consumer
- localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
- localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
+ try {
+ localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
+ localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
+ // print
+ localSubmitter.printResults(15, 1, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
- // print
- new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
} finally {
// kill
localSubmitter.kill(topic1Tp);
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
index 54ec99c..9666695 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
@@ -47,10 +47,13 @@ public class LocalSubmitter {
}
public static Config defaultConfig() {
+ return defaultConfig(false);
+ }
+
+ public static Config defaultConfig(boolean debug) {
final Config conf = new Config();
conf.setMaxSpoutPending(20);
- conf.setMaxTaskParallelism(1);
- conf.setNumWorkers(1);
+ conf.setDebug(debug);
return conf;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
index 4669f52..a39eba1 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
@@ -18,9 +18,7 @@
package org.apache.storm.kafka.trident;
-import org.apache.storm.Config;
import org.apache.storm.LocalDRPC;
-import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.starter.trident.DebugMemoryMapState;
import org.apache.storm.trident.Stream;
@@ -42,14 +40,6 @@ import org.slf4j.LoggerFactory;
public class TridentKafkaConsumerTopology {
protected static final Logger LOG = LoggerFactory.getLogger(TridentKafkaConsumerTopology.class);
- public static void submitRemote(String name, ITridentDataSource tridentSpout) {
- try {
- StormSubmitter.submitTopology(name, newTpConfig(), newTopology(null, tridentSpout));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
/**
* See {@link TridentKafkaConsumerTopology#newTopology(LocalDRPC, ITridentDataSource)}
*/
@@ -85,19 +75,11 @@ public class TridentKafkaConsumerTopology {
}
private static TridentState addTridentState(TridentTopology tridentTopology, ITridentDataSource tridentSpout) {
- final Stream spoutStream = tridentTopology.newStream("spout1", tridentSpout).parallelismHint(1);
+ final Stream spoutStream = tridentTopology.newStream("spout1", tridentSpout).parallelismHint(2);
return spoutStream.each(spoutStream.getOutputFields(), new Debug(true))
.each(new Fields("str"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new DebugMemoryMapState.Factory(), new Count(), new Fields("count"));
}
-
- private static Config newTpConfig() {
- Config conf = new Config();
- conf.setMaxSpoutPending(20);
- conf.setMaxTaskParallelism(1);
- return conf;
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
index 2faf52a..18a2246 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -34,8 +34,8 @@ public class KafkaTridentSpoutBatchMetadata<K,V> implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class);
private TopicPartition topicPartition; // topic partition of this batch
- private long firstOffset; // first offset of this batch
- private long lastOffset; // last offset of this batch
+ private long firstOffset; // first offset of this batch
+ private long lastOffset; // last offset of this batch
public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long firstOffset, long lastOffset) {
this.topicPartition = topicPartition;
@@ -74,8 +74,8 @@ public class KafkaTridentSpoutBatchMetadata<K,V> implements Serializable {
@Override
public String toString() {
- return "KafkaTridentSpoutBatchMetadata{" +
- "topicPartition=" + topicPartition +
+ return super.toString() +
+ "{topicPartition=" + topicPartition +
", firstOffset=" + firstOffset +
", lastOffset=" + lastOffset +
'}';
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 19b4f01..79dfc60 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -18,20 +18,6 @@
package org.apache.storm.kafka.spout.trident;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -47,9 +33,28 @@ import org.apache.storm.trident.topology.TransactionAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentSpout.Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>>, Serializable {
- private static final long serialVersionUID = -7343927794834130435L;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
+ List<TopicPartition>,
+ KafkaTridentSpoutTopicPartition,
+ KafkaTridentSpoutBatchMetadata<K, V>>,
+ Serializable {
+
+ private static final long serialVersionUID = -7343927794834130435L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
// Kafka
@@ -57,60 +62,81 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
// Bookkeeping
private final KafkaTridentSpoutManager<K, V> kafkaManager;
+
// Declare some KafkaTridentSpoutManager references for convenience
private final long pollTimeoutMs;
private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
private final RecordTranslator<K, V> translator;
private final Timer refreshSubscriptionTimer;
- public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext context) {
- this.kafkaManager = kafkaManager;
- this.kafkaManager.subscribeKafkaConsumer(context);
- refreshSubscriptionTimer = new Timer(500, kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
+ private TopologyContext topologyContext;
- //must subscribeKafkaConsumer before this line
- kafkaConsumer = kafkaManager.getKafkaConsumer();
- translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
+ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
+ this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
+ this.kafkaManager = kafkaManager;
+ this.topologyContext = topologyContext;
+ this.refreshSubscriptionTimer = refreshSubscriptionTimer;
+ this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
- pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
- firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
+ this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
+ this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
LOG.debug("Created {}", this);
}
+ /**
+ * Creates instance of this class with default 500 millisecond refresh subscription timer
+ */
+ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext topologyContext) {
+ this(kafkaManager, topologyContext, new Timer(500,
+ kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
+ }
+
@Override
public KafkaTridentSpoutBatchMetadata<K, V> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
- KafkaTridentSpoutTopicPartition partitionTs, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
- LOG.debug("Emitting batch: [transaction = {}], [partition = {}], [collector = {}], [lastBatchMetadata = {}]",
- tx, partitionTs, collector, lastBatch);
+ KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
- final TopicPartition topicPartition = partitionTs.getTopicPartition();
+ LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
+ tx, currBatchPartition, lastBatch, collector);
+
+ final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
+ final Set<TopicPartition> assignments = kafkaConsumer.assignment();
KafkaTridentSpoutBatchMetadata<K, V> currentBatch = lastBatch;
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
- try {
- // pause other topic partitions to only poll from current topic partition
- pausedTopicPartitions = pauseTopicPartitions(topicPartition);
+ if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
+ LOG.warn("SKIPPING processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
+ "[collector = {}] because it is not assigned {} to consumer instance [{}] of consumer group [{}]",
+ tx, currBatchPartition, lastBatch, collector, assignments, kafkaConsumer,
+ kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
+ } else {
+ try {
+ // pause other topic-partitions to only poll from current topic-partition
+ pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
- seek(topicPartition, lastBatch);
+ seek(currBatchTp, lastBatch);
- // poll
- if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
- kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
- }
- final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
- LOG.debug("Polled [{}] records from Kafka.", records.count());
+ // poll
+ if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+ kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
+ }
+
+ final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
+ LOG.debug("Polled [{}] records from Kafka.", records.count());
- if (!records.isEmpty()) {
- emitTuples(collector, records);
- // build new metadata
- currentBatch = new KafkaTridentSpoutBatchMetadata<>(topicPartition, records, lastBatch);
+ if (!records.isEmpty()) {
+ emitTuples(collector, records);
+ // build new metadata
+ currentBatch = new KafkaTridentSpoutBatchMetadata<>(currBatchTp, records, lastBatch);
+ }
+ } finally {
+ kafkaConsumer.resume(pausedTopicPartitions);
+ LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
}
- } finally {
- kafkaConsumer.resume(pausedTopicPartitions);
- LOG.trace("Resumed topic partitions [{}]", pausedTopicPartitions);
+ LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
}
- LOG.debug("Current batch metadata {}", currentBatch);
+
return currentBatch;
}
@@ -118,7 +144,7 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
for (ConsumerRecord<K, V> record : records) {
final List<Object> tuple = translator.apply(record);
collector.emit(tuple);
- LOG.debug("Emitted tuple [{}] for record: [{}]", tuple, record);
+ LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
}
}
@@ -135,7 +161,6 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
if (lastBatchMeta != null) {
kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
LOG.debug("Seeking fetch offset to next offset after last offset from previous batch");
-
} else {
LOG.debug("Seeking fetch offset from firstPollOffsetStrategy and last commit to Kafka");
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
@@ -161,31 +186,86 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
return fetchOffset;
}
- // returns paused topic partitions
+ // returns paused topic-partitions.
private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
- final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment());
- LOG.debug("Currently assigned topic partitions [{}]", pausedTopicPartitions);
+ final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment());
+ LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
pausedTopicPartitions.remove(excludedTp);
kafkaConsumer.pause(pausedTopicPartitions);
- LOG.trace("Paused topic partitions [{}]", pausedTopicPartitions);
+ LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
return pausedTopicPartitions;
}
@Override
public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
- LOG.debug("Refreshing topic partitions [{}]", partitionResponsibilities);
+ LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
+ "No action taken by this method for topic partitions {}", partitionResponsibilities);
}
+ /**
+ * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
+ * for this task must be assigned to the Kafka consumer running on this task.
+ * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
+ * @return ordered list of topic partitions for this task
+ */
@Override
- public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<TopicPartition> allPartitionInfo) {
- final List<KafkaTridentSpoutTopicPartition> topicPartitionsTrident = new ArrayList<>(allPartitionInfo == null ? 0 : allPartitionInfo.size());
- if (allPartitionInfo != null) {
- for (TopicPartition topicPartition : allPartitionInfo) {
- topicPartitionsTrident.add(new KafkaTridentSpoutTopicPartition(topicPartition));
+ public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<TopicPartition> allPartitionInfo) {
+ final int numTopicPartitions = allPartitionInfo == null ? 0 : allPartitionInfo.size();
+ final int taskIndex = topologyContext.getThisTaskIndex();
+ final int numTasks = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
+
+ LOG.debug("Computing task ordered list of topic-partitions from all partitions list {}, " +
+ "for task with index [{}] of total tasks [{}] ", allPartitionInfo, taskIndex, numTasks);
+
+ final Set<TopicPartition> assignment = kafkaConsumer.assignment();
+ LOG.debug("Consumer [{}] has assigned topic-partitions {}", kafkaConsumer, assignment);
+
+ List<KafkaTridentSpoutTopicPartition> taskOrderedTps = new ArrayList<>(numTopicPartitions);
+
+ if (numTopicPartitions > 0) {
+ final KafkaTridentSpoutTopicPartition[] tps = new KafkaTridentSpoutTopicPartition[numTopicPartitions];
+ int tpTaskComputedIdx = taskIndex;
+ /*
+ * Put this task's Kafka consumer assigned topic-partitions in the right index locations such
+ * that distribution by OpaquePartitionedTridentSpoutExecutor can be done correctly. This algorithm
+ * does the distribution in exactly the same way as the one used in OpaquePartitionedTridentSpoutExecutor
+ */
+ for (TopicPartition assignedTp : assignment) {
+ if (tpTaskComputedIdx >= numTopicPartitions) {
+ LOG.warn("Ignoring attempt to add consumer [{}] assigned topic-partition [{}] to index [{}], " +
+ "out of bounds [{}]. ", kafkaConsumer, assignedTp, tpTaskComputedIdx, numTopicPartitions);
+ break;
+ }
+ tps[tpTaskComputedIdx] = new KafkaTridentSpoutTopicPartition(assignedTp);
+ LOG.debug("Added consumer assigned topic-partition [{}] to position [{}] for task with index [{}]",
+ assignedTp, tpTaskComputedIdx, taskIndex);
+ tpTaskComputedIdx += numTasks;
+ }
+
+ // Put topic-partitions assigned to consumer instances running in different tasks in the empty slots
+ int i = 0;
+ for (TopicPartition tp : allPartitionInfo) {
+ /*
+ * Topic-partition not assigned to the Kafka consumer associated with this emitter task, hence not yet
+ * added to the list of task ordered partitions. To be processed next.
+ */
+ if (!assignment.contains(tp)) {
+ for (; i < numTopicPartitions; i++) {
+ if (tps[i] == null) { // find empty slot to put the topic-partition
+ tps[i] = new KafkaTridentSpoutTopicPartition(tp);
+ LOG.debug("Added to position [{}] topic-partition [{}], which is assigned to a consumer " +
+ "running on a task other than task with index [{}] ", i, tp, taskIndex);
+ i++;
+ break;
+ }
+ }
+ }
}
+ taskOrderedTps = Arrays.asList(tps);
}
- LOG.debug("OrderedPartitions = {}", topicPartitionsTrident);
- return topicPartitionsTrident;
+ LOG.debug("Returning ordered list of topic-partitions {} for task with index [{}], of total tasks [{}] ",
+ taskOrderedTps, taskIndex, numTasks);
+ return taskOrderedTps;
}
@Override
@@ -196,8 +276,8 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
@Override
public String toString() {
- return "KafkaTridentSpoutEmitter{" +
- ", kafkaManager=" + kafkaManager +
+ return super.toString() +
+ "{kafkaManager=" + kafkaManager +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
index 4b60f33..4054b49 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -18,22 +18,20 @@
package org.apache.storm.kafka.spout.trident;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.RecordTranslator;
-import org.apache.storm.kafka.spout.internal.Timer;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+
public class KafkaTridentSpoutManager<K, V> implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class);
@@ -43,50 +41,20 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
// Bookkeeping
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
// Declare some KafkaSpoutConfig references for convenience
- private final Fields fields;
+ private Fields fields;
public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
- RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
- Fields fields = null;
- for (String stream: translator.streams()) {
- if (fields == null) {
- fields = translator.getFieldsFor(stream);
- } else {
- if (!fields.equals(translator.getFieldsFor(stream))) {
- throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields");
- }
- }
- }
- this.fields = fields;
+ this.fields = getFields();
LOG.debug("Created {}", this);
}
- void subscribeKafkaConsumer(TopologyContext context) {
+ KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext context) {
kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
-
- // Initial poll to get the consumer registration process going.
- // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
- kafkaConsumer.poll(0);
- }
-
- private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
- kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
- KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions);
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions);
- LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
- kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
- }
+ return kafkaConsumer;
}
KafkaConsumer<K, V> getKafkaConsumer() {
@@ -96,8 +64,23 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
Set<TopicPartition> getTopicPartitions() {
return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
}
-
+
Fields getFields() {
+ if (fields == null) {
+ RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
+ Fields fs = null;
+ for (String stream : translator.streams()) {
+ if (fs == null) {
+ fs = translator.getFieldsFor(stream);
+ } else {
+ if (!fs.equals(translator.getFieldsFor(stream))) {
+ throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields");
+ }
+ }
+ }
+ fields = fs;
+ }
+ LOG.debug("OutputFields = {}", fields);
return fields;
}
@@ -107,9 +90,25 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
@Override
public String toString() {
- return "KafkaTridentSpoutManager{" +
- "kafkaConsumer=" + kafkaConsumer +
+ return super.toString() +
+ "{kafkaConsumer=" + kafkaConsumer +
", kafkaSpoutConfig=" + kafkaSpoutConfig +
'}';
}
+
+ private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+ KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions);
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions);
+ LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index 5c5856c..18d37d9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -34,7 +34,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
- private KafkaTridentSpoutManager<K, V> kafkaManager;
+ private final KafkaTridentSpoutManager<K, V> kafkaManager;
private KafkaTridentSpoutEmitter<K, V> kafkaTridentSpoutEmitter;
private KafkaTridentSpoutOpaqueCoordinator<K, V> coordinator;
@@ -50,19 +50,12 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
@Override
public Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> getEmitter(Map conf, TopologyContext context) {
- // Instance is created on first call rather than in constructor to avoid NotSerializableException caused by KafkaConsumer
- if (kafkaTridentSpoutEmitter == null) {
- kafkaTridentSpoutEmitter = new KafkaTridentSpoutEmitter<>(kafkaManager, context);
- }
- return kafkaTridentSpoutEmitter;
+ return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
}
@Override
public Coordinator<List<TopicPartition>> getCoordinator(Map conf, TopologyContext context) {
- if (coordinator == null) {
- coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
- }
- return coordinator;
+ return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
}
@Override
@@ -79,8 +72,8 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
@Override
public String toString() {
- return "KafkaTridentSpoutOpaque{" +
- "kafkaManager=" + kafkaManager +
+ return super.toString() +
+ "{kafkaManager=" + kafkaManager +
", kafkaTridentSpoutEmitter=" + kafkaTridentSpoutEmitter +
", coordinator=" + coordinator +
'}';
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
index 6e85735..7898b6e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
@@ -57,8 +57,8 @@ public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartition
@Override
public String toString() {
- return "KafkaTridentSpoutOpaqueCoordinator{" +
- "kafkaManager=" + kafkaManager +
+ return super.toString() +
+ "{kafkaManager=" + kafkaManager +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
index ba6126c..b020bea 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
@@ -43,7 +43,7 @@ public class KafkaTridentSpoutTopicPartition implements ISpoutPartition, Seriali
@Override
public String getId() {
- return topicPartition.topic() + "/" + topicPartition.partition();
+ return topicPartition.topic() + "@" + topicPartition.partition();
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
index ee5220e..2d50ca7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.Set;
public enum KafkaTridentSpoutTopicPartitionRegistry {
@@ -31,7 +31,7 @@ public enum KafkaTridentSpoutTopicPartitionRegistry {
private Set<TopicPartition> topicPartitions;
KafkaTridentSpoutTopicPartitionRegistry() {
- this.topicPartitions = new HashSet<>();
+ this.topicPartitions = new LinkedHashSet<>();
}
public Set<TopicPartition> getTopicPartitions() {
[3/3] storm git commit: STORM-2281: CHANGELOG
Posted by ka...@apache.org.
STORM-2281: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e232e2f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e232e2f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e232e2f
Branch: refs/heads/master
Commit: 3e232e2fc70c8dcc1fa1f79e0e2b50dce37072b0
Parents: 4691626
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Feb 8 11:11:24 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Feb 8 11:11:24 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3e232e2f/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e705c93..c7fb4c8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -197,6 +197,7 @@
* STORM-1769: Added a test to check local nimbus with notifier plugin
## 1.1.0
+ * STORM-2281: Running Multiple Kafka Spouts (Trident) Throws Illegal State Exception
* STORM-2296: Kafka spout no dup on leader changes
* STORM-2014: New Kafka spout duplicates checking if failed messages have reached max retries
* STORM-1443: [Storm SQL] Support customizing parallelism in StormSQL
[2/3] storm git commit: Merge branch
'Apache_master_STORM-2281_TridentParellism2+' of
https://github.com/hmcl/storm-apache into STORM-2281-merge
Posted by ka...@apache.org.
Merge branch 'Apache_master_STORM-2281_TridentParellism2+' of https://github.com/hmcl/storm-apache into STORM-2281-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/46916263
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/46916263
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/46916263
Branch: refs/heads/master
Commit: 469162639ece6e774f1e526c91b799b6cc85daec
Parents: d3250b2 07cf86e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Feb 8 10:25:00 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Feb 8 10:25:00 2017 +0900
----------------------------------------------------------------------
.../TridentKafkaClientWordCountNamedTopics.java | 44 ++--
.../storm/kafka/trident/LocalSubmitter.java | 7 +-
.../trident/TridentKafkaConsumerTopology.java | 20 +-
.../trident/KafkaTridentSpoutBatchMetadata.java | 8 +-
.../spout/trident/KafkaTridentSpoutEmitter.java | 206 +++++++++++++------
.../spout/trident/KafkaTridentSpoutManager.java | 85 ++++----
.../spout/trident/KafkaTridentSpoutOpaque.java | 17 +-
.../KafkaTridentSpoutOpaqueCoordinator.java | 4 +-
.../KafkaTridentSpoutTopicPartition.java | 2 +-
...KafkaTridentSpoutTopicPartitionRegistry.java | 4 +-
10 files changed, 237 insertions(+), 160 deletions(-)
----------------------------------------------------------------------