You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/08 14:25:15 UTC
[1/2] storm git commit: STORM-2675: Fix storm-kafka-client Trident
spout failing to serialize meta objects to Zookeeper
Repository: storm
Updated Branches:
refs/heads/master 306f399e9 -> ea0e465ab
STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta objects to Zookeeper
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54a829ce
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54a829ce
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54a829ce
Branch: refs/heads/master
Commit: 54a829ceba6c1575d0665721509889e4b60dd066
Parents: e1dd247
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Aug 4 02:53:42 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Aug 30 18:54:08 2017 +0200
----------------------------------------------------------------------
examples/storm-kafka-client-examples/pom.xml | 11 +-
examples/storm-kafka-examples/pom.xml | 11 +-
external/storm-kafka-client/pom.xml | 2 +-
.../kafka/spout/subscription/Subscription.java | 5 +-
.../trident/KafkaTridentSpoutBatchMetadata.java | 77 +++++++++----
.../spout/trident/KafkaTridentSpoutEmitter.java | 108 +++++++++++--------
.../spout/trident/KafkaTridentSpoutManager.java | 10 +-
.../spout/trident/KafkaTridentSpoutOpaque.java | 12 +--
.../KafkaTridentSpoutOpaqueCoordinator.java | 17 +--
.../spout/trident/TopicPartitionSerializer.java | 47 ++++++++
.../storm/kafka/trident/TridentKafkaState.java | 9 ++
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 2 +-
.../SpoutWithMockedConsumerSetupHelper.java | 33 ++++--
.../KafkaTridentSpoutBatchMetadataTest.java | 66 ++++++++++++
.../spout/IOpaquePartitionedTridentSpout.java | 22 +++-
.../topology/state/TransactionalState.java | 12 ++-
16 files changed, 334 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/examples/storm-kafka-client-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml
index af3eb9d..c6836d0 100644
--- a/examples/storm-kafka-client-examples/pom.xml
+++ b/examples/storm-kafka-client-examples/pom.xml
@@ -28,11 +28,6 @@
</parent>
<modelVersion>4.0.0</modelVersion>
-
- <properties>
- <!-- Override with -Dkafka.dependency.scope=provided to generate a jar without dependencies -->
- <kafka.dependency.scope>compile</kafka.dependency.scope>
- </properties>
<artifactId>storm-kafka-client-examples</artifactId>
@@ -47,19 +42,19 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${project.version}</version>
- <scope>${kafka.dependency.scope}</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>${storm.kafka.artifact.id}</artifactId>
<version>${storm.kafka.client.version}</version>
- <scope>${kafka.dependency.scope}</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.client.version}</version>
- <scope>${kafka.dependency.scope}</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/examples/storm-kafka-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml
index eb7bd22..13b5573 100644
--- a/examples/storm-kafka-examples/pom.xml
+++ b/examples/storm-kafka-examples/pom.xml
@@ -24,11 +24,6 @@
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
-
- <properties>
- <!-- Override with -Dkafka.dependency.scope=provided to generate a jar without dependencies -->
- <kafka.dependency.scope>compile</kafka.dependency.scope>
- </properties>
<artifactId>storm-kafka-examples</artifactId>
@@ -43,19 +38,19 @@
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${project.version}</version>
- <scope>${kafka.dependency.scope}</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>${storm.kafka.artifact.id}</artifactId>
<version>${storm.kafka.version}</version>
- <scope>${kafka.dependency.scope}</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${storm.kafka.version}</version>
- <scope>${kafka.dependency.scope}</scope>
+ <scope>compile</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 2f17ea1..c3bd457 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -149,7 +149,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
- <maxAllowedViolations>6</maxAllowedViolations>
+ <maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
index 8091bfa..55e1c63 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
@@ -30,7 +30,10 @@ public abstract class Subscription implements Serializable {
private static final long serialVersionUID = -216136367240198716L;
/**
- * Subscribe the KafkaConsumer to the proper topics
+ * Subscribe the KafkaConsumer to the proper topics.
+ * Implementations must ensure that a given topic partition is always assigned to the same spout task.
+ * Adding and removing partitions as necessary is fine, but partitions must not move from one task to another.
+ * This constraint is only important for use with the Trident spout.
* @param consumer the Consumer to get.
* @param listener the rebalance listener to include in the subscription
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
index df56016..9ba76d7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -19,7 +19,10 @@
package org.apache.storm.kafka.spout.trident;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
@@ -29,38 +32,47 @@ import org.slf4j.LoggerFactory;
/**
* Wraps transaction batch information.
*/
-public class KafkaTridentSpoutBatchMetadata<K,V> implements Serializable {
+public class KafkaTridentSpoutBatchMetadata implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class);
-
+ private static final TopicPartitionSerializer TP_SERIALIZER = new TopicPartitionSerializer();
+
+ public static final String TOPIC_PARTITION_KEY = "tp";
+ public static final String FIRST_OFFSET_KEY = "firstOffset";
+ public static final String LAST_OFFSET_KEY = "lastOffset";
+
// topic partition of this batch
- private TopicPartition topicPartition;
+ private final TopicPartition topicPartition;
// first offset of this batch
- private long firstOffset;
+ private final long firstOffset;
// last offset of this batch
- private long lastOffset;
+ private final long lastOffset;
+ /**
+ * Builds a metadata object.
+ * @param topicPartition The topic partition
+ * @param firstOffset The first offset for the batch
+ * @param lastOffset The last offset for the batch
+ */
public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long firstOffset, long lastOffset) {
this.topicPartition = topicPartition;
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
}
- public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, ConsumerRecords<K, V> consumerRecords,
- KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
- this.topicPartition = topicPartition;
-
+ /**
+ * Builds a metadata object from a non-empty set of records.
+ * @param topicPartition The topic partition the records belong to.
+ * @param consumerRecords The non-empty set of records.
+ */
+ public <K, V> KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, ConsumerRecords<K, V> consumerRecords) {
+ Validate.notNull(consumerRecords.records(topicPartition));
List<ConsumerRecord<K, V>> records = consumerRecords.records(topicPartition);
-
- if (records != null && !records.isEmpty()) {
- firstOffset = records.get(0).offset();
- lastOffset = records.get(records.size() - 1).offset();
- } else {
- if (lastBatch != null) {
- firstOffset = lastBatch.firstOffset;
- lastOffset = lastBatch.lastOffset;
- }
- }
- LOG.debug("Created {}", this);
+ Validate.isTrue(!records.isEmpty(), "There must be at least one record in order to build metadata");
+
+ this.topicPartition = topicPartition;
+ firstOffset = records.get(0).offset();
+ lastOffset = records.get(records.size() - 1).offset();
+ LOG.debug("Created {}", this.toString());
}
public long getFirstOffset() {
@@ -74,9 +86,32 @@ public class KafkaTridentSpoutBatchMetadata<K,V> implements Serializable {
public TopicPartition getTopicPartition() {
return topicPartition;
}
+
+ /**
+ * Constructs a metadata object from a Map in the format produced by {@link #toMap() }.
+ * @param map The source map
+ * @return A new metadata object
+ */
+ public static KafkaTridentSpoutBatchMetadata fromMap(Map<String, Object> map) {
+ Map<String, Object> topicPartitionMap = (Map<String, Object>)map.get(TOPIC_PARTITION_KEY);
+ TopicPartition tp = TP_SERIALIZER.fromMap(topicPartitionMap);
+ return new KafkaTridentSpoutBatchMetadata(tp, ((Number)map.get(FIRST_OFFSET_KEY)).longValue(),
+ ((Number)map.get(LAST_OFFSET_KEY)).longValue());
+ }
+
+ /**
+ * Writes this metadata object to a Map so Trident can read/write it to Zookeeper.
+ */
+ public Map<String, Object> toMap() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(TOPIC_PARTITION_KEY, TP_SERIALIZER.toMap(topicPartition));
+ map.put(FIRST_OFFSET_KEY, firstOffset);
+ map.put(LAST_OFFSET_KEY, lastOffset);
+ return map;
+ }
@Override
- public String toString() {
+ public final String toString() {
return super.toString()
+ "{topicPartition=" + topicPartition
+ ", firstOffset=" + firstOffset
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 5351f79..a45eff8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -27,14 +27,16 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.RecordTranslator;
@@ -47,9 +49,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
- List<TopicPartition>,
+ List<Map<String, Object>>,
KafkaTridentSpoutTopicPartition,
- KafkaTridentSpoutBatchMetadata<K, V>>,
+ Map<String, Object>>,
Serializable {
private static final long serialVersionUID = -7343927794834130435L;
@@ -60,16 +62,24 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
// Bookkeeping
private final KafkaTridentSpoutManager<K, V> kafkaManager;
- private Set<TopicPartition> firstPoll = new HashSet<>(); // set of topic-partitions for which first poll has already occurred
+ // set of topic-partitions for which first poll has already occurred, and the first polled txid
+ private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>();
// Declare some KafkaTridentSpoutManager references for convenience
private final long pollTimeoutMs;
private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
private final RecordTranslator<K, V> translator;
private final Timer refreshSubscriptionTimer;
+ private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
private TopologyContext topologyContext;
+ /**
+ * Create a new Kafka spout emitter.
+ * @param kafkaManager The Kafka consumer manager to use
+ * @param topologyContext The topology context
+ * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription
+ */
public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext,
Timer refreshSubscriptionTimer) {
this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
@@ -81,7 +91,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
- LOG.debug("Created {}", this);
+ LOG.debug("Created {}", this.toString());
}
/**
@@ -93,15 +103,16 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
}
@Override
- public KafkaTridentSpoutBatchMetadata<K, V> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
- KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
+ public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
+ KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
tx, currBatchPartition, lastBatch, collector);
final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
final Set<TopicPartition> assignments = kafkaConsumer.assignment();
- KafkaTridentSpoutBatchMetadata<K, V> currentBatch = lastBatch;
+ KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
+ KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
@@ -114,7 +125,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
// pause other topic-partitions to only poll from current topic-partition
pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
- seek(currBatchTp, lastBatch);
+ seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
// poll
if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
@@ -127,7 +138,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
if (!records.isEmpty()) {
emitTuples(collector, records);
// build new metadata
- currentBatch = new KafkaTridentSpoutBatchMetadata<>(currBatchTp, records, lastBatch);
+ currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
}
} finally {
kafkaConsumer.resume(pausedTopicPartitions);
@@ -137,7 +148,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
}
- return currentBatch;
+ return currentBatch == null ? null : currentBatch.toMap();
}
private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
@@ -149,46 +160,49 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
}
/**
- * Determines the offset of the next fetch. For failed batches lastBatchMeta is not null and contains the fetch
- * offset of the failed batch. In this scenario the next fetch will take place at offset of the failed batch + 1.
- * When the previous batch is successful, lastBatchMeta is null, and the offset of the next fetch is either the
- * offset of the last commit to kafka, or if no commit was yet made, the offset dictated by
- * {@link KafkaSpoutConfig.FirstPollOffsetStrategy}
+ * Determines the offset of the next fetch. Will use the firstPollOffsetStrategy if this is the first poll for the topic partition.
+ * Otherwise the next offset will be one past the last batch, based on lastBatchMeta.
+ *
+ * <p>lastBatchMeta should only be null when the previous txid was not emitted (e.g. new topic),
+ * it is the first poll for the spout instance, or it is a replay of the first txid this spout emitted on this partition.
+ * In the second case, there are either no previous transactions, or the MBC is still committing them
+ * and they will fail because this spout did not emit the corresponding batches. If it had emitted them, the meta could not be null.
+ * In any case, the lastBatchMeta should never be null if this is not the first poll for this spout instance.
*
* @return the offset of the next fetch
*/
- private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata<K, V> lastBatchMeta) {
- if (lastBatchMeta != null) {
- kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
- LOG.debug("Seeking fetch offset to next offset after last offset from previous batch for topic-partition [{}]", tp);
- } else if (isFirstPoll(tp)) {
- LOG.debug("Seeking fetch offset from firstPollOffsetStrategy and last commit to Kafka for topic-partition [{}]", tp);
- firstPoll.add(tp);
- final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
- if (committedOffset != null) { // offset was committed for this TopicPartition
- if (firstPollOffsetStrategy.equals(EARLIEST)) {
- kafkaConsumer.seekToBeginning(Collections.singleton(tp));
- } else if (firstPollOffsetStrategy.equals(LATEST)) {
- kafkaConsumer.seekToEnd(Collections.singleton(tp));
- } else {
- // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
- kafkaConsumer.seek(tp, committedOffset.offset() + 1);
- }
- } else { // no commits have ever been done, so start at the beginning or end depending on the strategy
- if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
- kafkaConsumer.seekToBeginning(Collections.singleton(tp));
- } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
- kafkaConsumer.seekToEnd(Collections.singleton(tp));
- }
+ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) {
+ if (isFirstPoll(tp, transactionId)) {
+ if (firstPollOffsetStrategy == EARLIEST) {
+ LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
+ kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+ } else if (firstPollOffsetStrategy == LATEST) {
+ LOG.debug("First poll for topic partition [{}], seeking to partition end", tp);
+ kafkaConsumer.seekToEnd(Collections.singleton(tp));
+ } else if (lastBatchMeta != null) {
+ LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
+ kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
+ } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
+ LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp);
+ kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+ } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
+ LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp);
+ kafkaConsumer.seekToEnd(Collections.singleton(tp));
}
+ firstPollTransaction.put(tp, transactionId);
+ } else {
+ kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
+ LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
}
+
final long fetchOffset = kafkaConsumer.position(tp);
LOG.debug("Set [fetchOffset = {}]", fetchOffset);
return fetchOffset;
}
- private boolean isFirstPoll(TopicPartition tp) {
- return !firstPoll.contains(tp);
+ private boolean isFirstPoll(TopicPartition tp, long txid) {
+ // The first poll is either the "real" first transaction, or a replay of the first transaction
+ return !firstPollTransaction.containsKey(tp) || firstPollTransaction.get(tp) == txid;
}
// returns paused topic-partitions.
@@ -215,15 +229,19 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
* @return ordered list of topic partitions for this task
*/
@Override
- public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<TopicPartition> allPartitionInfo) {
- final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allPartitionInfo);
+ public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
+ List<TopicPartition> allTopicPartitions = allPartitionInfo.stream()
+ .map(map -> tpSerializer.fromMap(map))
+ .collect(Collectors.toList());
+ final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
return allPartitions;
}
@Override
- public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks, List<TopicPartition> allPartitionInfo) {
+ public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
+ List<Map<String, Object>> allPartitionInfo) {
final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
@@ -253,7 +271,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
}
@Override
- public String toString() {
+ public final String toString() {
return super.toString()
+ "{kafkaManager=" + kafkaManager
+ '}';
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
index 30d52cb..b5138c2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -42,10 +42,14 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
// Declare some KafkaSpoutConfig references for convenience
private Fields fields;
+ /**
+ * Create a KafkaConsumer manager for the trident spout.
+ * @param kafkaSpoutConfig The consumer config
+ */
public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
this.fields = getFields();
- LOG.debug("Created {}", this);
+ LOG.debug("Created {}", this.toString());
}
KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext context) {
@@ -63,7 +67,7 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
}
- Fields getFields() {
+ final Fields getFields() {
if (fields == null) {
RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
Fields fs = null;
@@ -87,7 +91,7 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
}
@Override
- public String toString() {
+ public final String toString() {
return super.toString()
+ "{kafkaConsumer=" + kafkaConsumer
+ ", kafkaSpoutConfig=" + kafkaSpoutConfig
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index 4f49c7d..8d33e39 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -28,8 +28,8 @@ import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<TopicPartition>,
- KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> {
+public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
+ KafkaTridentSpoutTopicPartition, Map<String, Object>> {
private static final long serialVersionUID = -8003272486566259640L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
@@ -42,17 +42,17 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
this.kafkaManager = kafkaManager;
- LOG.debug("Created {}", this);
+ LOG.debug("Created {}", this.toString());
}
@Override
- public Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> getEmitter(
+ public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
Map<String, Object> conf, TopologyContext context) {
return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
}
@Override
- public Coordinator<List<TopicPartition>> getCoordinator(Map<String, Object> conf, TopologyContext context) {
+ public Coordinator<List<Map<String, Object>>> getCoordinator(Map<String, Object> conf, TopologyContext context) {
return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
}
@@ -69,7 +69,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
}
@Override
- public String toString() {
+ public final String toString() {
return super.toString()
+ "{kafkaManager=" + kafkaManager + '}';
}
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
index c61cdcd..17732c2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
@@ -21,20 +21,23 @@ package org.apache.storm.kafka.spout.trident;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<TopicPartition>>,
+public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);
- private KafkaTridentSpoutManager<K,V> kafkaManager;
+ private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+ private final KafkaTridentSpoutManager<K,V> kafkaManager;
public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
this.kafkaManager = kafkaManager;
- LOG.debug("Created {}", this);
+ LOG.debug("Created {}", this.toString());
}
@Override
@@ -44,10 +47,12 @@ public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartition
}
@Override
- public List<TopicPartition> getPartitionsForBatch() {
+ public List<Map<String, Object>> getPartitionsForBatch() {
final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
LOG.debug("TopicPartitions for batch {}", topicPartitions);
- return topicPartitions;
+ return topicPartitions.stream()
+ .map(tp -> tpSerializer.toMap(tp))
+ .collect(Collectors.toList());
}
@Override
@@ -56,7 +61,7 @@ public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartition
}
@Override
- public String toString() {
+ public final String toString() {
return super.toString()
+ "{kafkaManager=" + kafkaManager
+ '}';
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
new file mode 100644
index 0000000..50e78f0
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+
+public class TopicPartitionSerializer {
+
+ public static final String TOPIC_PARTITION_TOPIC_KEY = "topic";
+ public static final String TOPIC_PARTITION_PARTITION_KEY = "partition";
+
+ /**
+ * Serializes the given TopicPartition to Map so Trident can serialize it to JSON.
+ */
+ public Map<String, Object> toMap(TopicPartition topicPartition) {
+ Map<String, Object> topicPartitionMap = new HashMap<>();
+ topicPartitionMap.put(TOPIC_PARTITION_TOPIC_KEY, topicPartition.topic());
+ topicPartitionMap.put(TOPIC_PARTITION_PARTITION_KEY, topicPartition.partition());
+ return topicPartitionMap;
+ }
+
+ /**
+ * Deserializes the given map into a TopicPartition. The map keys are expected to be those produced by
+ * {@link #toMap(org.apache.kafka.common.TopicPartition)}.
+ */
+ public TopicPartition fromMap(Map<String, Object> map) {
+ return new TopicPartition((String) map.get(TOPIC_PARTITION_TOPIC_KEY),
+ ((Number) map.get(TOPIC_PARTITION_PARTITION_KEY)).intValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
index f967609..c221f9e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -66,12 +66,21 @@ public class TridentKafkaState implements State {
LOG.debug("commit is Noop.");
}
+ /**
+ * Prepare this State.
+ * @param options The KafkaProducer config.
+ */
public void prepare(Properties options) {
Objects.requireNonNull(mapper, "mapper can not be null");
Objects.requireNonNull(topicSelector, "topicSelector can not be null");
producer = new KafkaProducer(options);
}
+ /**
+ * Write the given tuples to Kafka.
+ * @param tuples The tuples to write.
+ * @param collector Tbe Trident collector.
+ */
public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
String topic = null;
try {
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 513db90..051d212 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -44,7 +44,7 @@ public class KafkaSpoutConfigTest {
}
@Test
- public void test_setEmitNullTuples_true_true() {
+ public void testSetEmitNullTuplesToTrue() {
final KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.setEmitNullTuples(true)
.build();
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
index aa65d0f..a5d3c54 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -20,11 +20,13 @@ import static org.mockito.Matchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -33,9 +35,10 @@ import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
public class SpoutWithMockedConsumerSetupHelper {
-
+
/**
* Creates, opens and activates a KafkaSpout using a mocked consumer.
+ *
* @param <K> The Kafka key type
* @param <V> The Kafka value type
* @param spoutConfig The spout config to use
@@ -47,27 +50,45 @@ public class SpoutWithMockedConsumerSetupHelper {
* @return The spout
*/
public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf,
- TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {
+ TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {
Map<String, List<PartitionInfo>> partitionInfos = assignedPartitions.stream()
.map(tp -> new PartitionInfo(tp.topic(), tp.partition(), null, null, null))
.collect(Collectors.groupingBy(info -> info.topic()));
partitionInfos.keySet()
.forEach(key -> when(consumerMock.partitionsFor(key))
- .thenReturn(partitionInfos.get(key)));
+ .thenReturn(partitionInfos.get(key)));
KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory);
when(contextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
when(contextMock.getThisTaskIndex()).thenReturn(0);
-
+
spout.open(topoConf, contextMock, collectorMock);
spout.activate();
verify(consumerMock).assign(assignedPartitions);
-
+
return spout;
}
-
+
+ /**
+ * Creates sequential dummy records
+ *
+ * @param <K> The Kafka key type
+ * @param <V> The Kafka value type
+ * @param topic The topic partition to create records for
+ * @param startingOffset The starting offset of the records
+ * @param numRecords The number of records to create
+ * @return The dummy records
+ */
+ public static <K, V> List<ConsumerRecord<K, V>> createRecords(TopicPartition topic, long startingOffset, int numRecords) {
+ List<ConsumerRecord<K, V>> recordsForPartition = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ recordsForPartition.add(new ConsumerRecord<>(topic.topic(), topic.partition(), startingOffset + i, null, null));
+ }
+ return recordsForPartition;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
new file mode 100644
index 0000000..a5c78a8
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
+import org.json.simple.JSONValue;
+import org.junit.Test;
+
+public class KafkaTridentSpoutBatchMetadataTest {
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testMetadataIsRoundTripSerializableWithJsonSimple() throws Exception {
+ /**
+ * Tests that the metadata object can be converted to and from a Map. This is needed because Trident metadata is written to
+ * Zookeeper as JSON with the json-simple library, so the spout converts the metadata to Map before returning it to Trident.
+ * It is important that all map entries are types json-simple knows about,
+ * since otherwise the library just calls toString on them which will likely produce invalid JSON.
+ */
+ TopicPartition tp = new TopicPartition("topic", 0);
+ long startOffset = 10;
+ long endOffset = 20;
+
+ KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, startOffset, endOffset);
+ Map<String, Object> map = metadata.toMap();
+ Map deserializedMap = (Map)JSONValue.parseWithException(JSONValue.toJSONString(map));
+ KafkaTridentSpoutBatchMetadata deserializedMetadata = KafkaTridentSpoutBatchMetadata.fromMap(deserializedMap);
+ assertThat(deserializedMetadata.getTopicPartition(), is(metadata.getTopicPartition()));
+ assertThat(deserializedMetadata.getFirstOffset(), is(metadata.getFirstOffset()));
+ assertThat(deserializedMetadata.getLastOffset(), is(metadata.getLastOffset()));
+ }
+
+ @Test
+ public void testCreateMetadataFromRecords() {
+ TopicPartition tp = new TopicPartition("topic", 0);
+ long firstOffset = 15;
+ long lastOffset = 55;
+ ConsumerRecords<?, ?> records = new ConsumerRecords<>(Collections.singletonMap(tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, (int) (lastOffset - firstOffset + 1))));
+
+ KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, records);
+ assertThat("The first offset should be the first offset in the record set", metadata.getFirstOffset(), is(firstOffset));
+ assertThat("The last offset should be the last offset in the record set", metadata.getLastOffset(), is(lastOffset));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index 789b615..5fe3c65 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -30,12 +30,32 @@ import java.util.Map;
* This defines a transactional spout which does *not* necessarily
* replay the same batch every time it emits a batch for a transaction id.
*
+ * @param <M> The type of metadata object passed to the Emitter when emitting a new batch based on a previous batch. This type must be JSON
+ * serializable by json-simple.
+ * @param <Partitions> The type of metadata object used by the coordinator to describe partitions. This type must be JSON serializable by
+ * json-simple.
*/
public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, M>
extends ITridentDataSource {
+ /**
+ * Coordinator for batches. Trident will only begin committing once at least one coordinator is ready.
+ *
+ * @param <Partitions> The type of metadata object used by the coordinator to describe partitions. This type must be JSON serializable
+ * by json-simple.
+ */
interface Coordinator<Partitions> {
+ /**
+ * Indicates whether this coordinator is ready to commit the given transaction.
+ * The master batch coordinator will only begin committing if at least one coordinator indicates it is ready to commit.
+ * @param txid The transaction id
+ * @return true if this coordinator is ready to commit, false otherwise.
+ */
boolean isReady(long txid);
+ /**
+ * Gets the partitions for the following batches. The emitter will be asked to refresh partitions when this value changes.
+ * @return The partitions for the following batches.
+ */
Partitions getPartitionsForBatch();
void close();
}
@@ -79,7 +99,7 @@ public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends IS
Emitter<Partitions, Partition, M> getEmitter(Map<String, Object> conf, TopologyContext context);
- Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context);
+ Coordinator<Partitions> getCoordinator(Map<String, Object> conf, TopologyContext context);
Map<String, Object> getComponentConfiguration();
http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index ce81334..bb05450 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.json.simple.parser.ParseException;
/**
* Class that contains the logic to extract the transactional state info from zookeeper. All transactional state
@@ -170,9 +171,14 @@ public class TransactionalState {
try {
Object data;
if(_curator.checkExists().forPath(path)!=null) {
- // intentionally using parse() instead of parseWithException() to handle error cases as null
- // this have been used from the start of Trident so we could treat it as safer way
- data = JSONValue.parse(new String(_curator.getData().forPath(path), "UTF-8"));
+ // Use parseWithException instead of parse so we can capture deserialization errors in the log.
+ // They are likely to be bugs in the spout code.
+ try{
+ data = JSONValue.parseWithException(new String(_curator.getData().forPath(path), "UTF-8"));
+ } catch (ParseException e) {
+ LOG.warn("Failed to deserialize zookeeper data for path {}", path, e);
+ data = null;
+ }
} else {
data = null;
}
[2/2] storm git commit: Merge branch 'STORM-2675' of
https://github.com/srdo/storm into STORM-2675
Posted by bo...@apache.org.
Merge branch 'STORM-2675' of https://github.com/srdo/storm into STORM-2675
STORM-2675: Fix storm-kafka-client Trident spout failing to serialize
meta objects to Zookeeper
This closes #2271
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ea0e465a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ea0e465a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ea0e465a
Branch: refs/heads/master
Commit: ea0e465ab863668f0b04aba7d01050aeb4b13d22
Parents: 306f399 54a829c
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 8 08:56:38 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 8 08:56:38 2017 -0500
----------------------------------------------------------------------
examples/storm-kafka-client-examples/pom.xml | 11 +-
examples/storm-kafka-examples/pom.xml | 11 +-
external/storm-kafka-client/pom.xml | 2 +-
.../kafka/spout/subscription/Subscription.java | 5 +-
.../trident/KafkaTridentSpoutBatchMetadata.java | 77 +++++++++----
.../spout/trident/KafkaTridentSpoutEmitter.java | 108 +++++++++++--------
.../spout/trident/KafkaTridentSpoutManager.java | 10 +-
.../spout/trident/KafkaTridentSpoutOpaque.java | 12 +--
.../KafkaTridentSpoutOpaqueCoordinator.java | 17 +--
.../spout/trident/TopicPartitionSerializer.java | 47 ++++++++
.../storm/kafka/trident/TridentKafkaState.java | 9 ++
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 2 +-
.../SpoutWithMockedConsumerSetupHelper.java | 33 ++++--
.../KafkaTridentSpoutBatchMetadataTest.java | 66 ++++++++++++
.../spout/IOpaquePartitionedTridentSpout.java | 22 +++-
.../topology/state/TransactionalState.java | 12 ++-
16 files changed, 334 insertions(+), 110 deletions(-)
----------------------------------------------------------------------