You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/08 14:25:15 UTC

[1/2] storm git commit: STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta objects to Zookeeper

Repository: storm
Updated Branches:
  refs/heads/master 306f399e9 -> ea0e465ab


STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta objects to Zookeeper


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/54a829ce
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/54a829ce
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/54a829ce

Branch: refs/heads/master
Commit: 54a829ceba6c1575d0665721509889e4b60dd066
Parents: e1dd247
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Aug 4 02:53:42 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Aug 30 18:54:08 2017 +0200

----------------------------------------------------------------------
 examples/storm-kafka-client-examples/pom.xml    |  11 +-
 examples/storm-kafka-examples/pom.xml           |  11 +-
 external/storm-kafka-client/pom.xml             |   2 +-
 .../kafka/spout/subscription/Subscription.java  |   5 +-
 .../trident/KafkaTridentSpoutBatchMetadata.java |  77 +++++++++----
 .../spout/trident/KafkaTridentSpoutEmitter.java | 108 +++++++++++--------
 .../spout/trident/KafkaTridentSpoutManager.java |  10 +-
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  12 +--
 .../KafkaTridentSpoutOpaqueCoordinator.java     |  17 +--
 .../spout/trident/TopicPartitionSerializer.java |  47 ++++++++
 .../storm/kafka/trident/TridentKafkaState.java  |   9 ++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |   2 +-
 .../SpoutWithMockedConsumerSetupHelper.java     |  33 ++++--
 .../KafkaTridentSpoutBatchMetadataTest.java     |  66 ++++++++++++
 .../spout/IOpaquePartitionedTridentSpout.java   |  22 +++-
 .../topology/state/TransactionalState.java      |  12 ++-
 16 files changed, 334 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/examples/storm-kafka-client-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml
index af3eb9d..c6836d0 100644
--- a/examples/storm-kafka-client-examples/pom.xml
+++ b/examples/storm-kafka-client-examples/pom.xml
@@ -28,11 +28,6 @@
     </parent>
 
     <modelVersion>4.0.0</modelVersion>
-    
-    <properties>
-        <!-- Override with -Dkafka.dependency.scope=provided to generate a jar without dependencies -->
-        <kafka.dependency.scope>compile</kafka.dependency.scope>
-    </properties>
 
     <artifactId>storm-kafka-client-examples</artifactId>
 
@@ -47,19 +42,19 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-kafka-client</artifactId>
             <version>${project.version}</version>
-            <scope>${kafka.dependency.scope}</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>${storm.kafka.artifact.id}</artifactId>
             <version>${storm.kafka.client.version}</version>
-            <scope>${kafka.dependency.scope}</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>${storm.kafka.client.version}</version>
-            <scope>${kafka.dependency.scope}</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/examples/storm-kafka-examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml
index eb7bd22..13b5573 100644
--- a/examples/storm-kafka-examples/pom.xml
+++ b/examples/storm-kafka-examples/pom.xml
@@ -24,11 +24,6 @@
         <version>2.0.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
-    
-    <properties>
-        <!-- Override with -Dkafka.dependency.scope=provided to generate a jar without dependencies -->
-        <kafka.dependency.scope>compile</kafka.dependency.scope>
-    </properties>
 
     <artifactId>storm-kafka-examples</artifactId>
 
@@ -43,19 +38,19 @@
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-kafka</artifactId>
             <version>${project.version}</version>
-            <scope>${kafka.dependency.scope}</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>${storm.kafka.artifact.id}</artifactId>
             <version>${storm.kafka.version}</version>
-            <scope>${kafka.dependency.scope}</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>${storm.kafka.version}</version>
-            <scope>${kafka.dependency.scope}</scope>
+            <scope>compile</scope>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 2f17ea1..c3bd457 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -149,7 +149,7 @@
                 <artifactId>maven-checkstyle-plugin</artifactId>
                 <!--Note - the version would be inherited-->
                 <configuration>
-                    <maxAllowedViolations>6</maxAllowedViolations>
+                    <maxAllowedViolations>0</maxAllowedViolations>
                 </configuration>
             </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
index 8091bfa..55e1c63 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
@@ -30,7 +30,10 @@ public abstract class Subscription implements Serializable {
     private static final long serialVersionUID = -216136367240198716L;
 
     /**
-     * Subscribe the KafkaConsumer to the proper topics
+     * Subscribe the KafkaConsumer to the proper topics.
+     * Implementations must ensure that a given topic partition is always assigned to the same spout task.
+     * Adding and removing partitions as necessary is fine, but partitions must not move from one task to another.
+     * This constraint is only important for use with the Trident spout.
      * @param consumer the Consumer to get.
      * @param listener the rebalance listener to include in the subscription
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
index df56016..9ba76d7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -19,7 +19,10 @@
 package org.apache.storm.kafka.spout.trident;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.Validate;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
@@ -29,38 +32,47 @@ import org.slf4j.LoggerFactory;
 /**
  * Wraps transaction batch information.
  */
-public class KafkaTridentSpoutBatchMetadata<K,V> implements Serializable {
+public class KafkaTridentSpoutBatchMetadata implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class);
-
+    private static final TopicPartitionSerializer TP_SERIALIZER = new TopicPartitionSerializer();
+    
+    public static final String TOPIC_PARTITION_KEY = "tp";
+    public static final String FIRST_OFFSET_KEY = "firstOffset";
+    public static final String LAST_OFFSET_KEY = "lastOffset";
+    
     // topic partition of this batch
-    private TopicPartition topicPartition;  
+    private final TopicPartition topicPartition;  
     // first offset of this batch
-    private long firstOffset;               
+    private final long firstOffset;               
     // last offset of this batch
-    private long lastOffset;
+    private final long lastOffset;
 
+    /**
+     * Builds a metadata object.
+     * @param topicPartition The topic partition
+     * @param firstOffset The first offset for the batch
+     * @param lastOffset The last offset for the batch
+     */
     public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long firstOffset, long lastOffset) {
         this.topicPartition = topicPartition;
         this.firstOffset = firstOffset;
         this.lastOffset = lastOffset;
     }
 
-    public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, ConsumerRecords<K, V> consumerRecords,
-            KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
-        this.topicPartition = topicPartition;
-
+    /**
+     * Builds a metadata object from a non-empty set of records.
+     * @param topicPartition The topic partition the records belong to.
+     * @param consumerRecords The non-empty set of records.
+     */
+    public <K, V> KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, ConsumerRecords<K, V> consumerRecords) {
+        Validate.notNull(consumerRecords.records(topicPartition));
         List<ConsumerRecord<K, V>> records = consumerRecords.records(topicPartition);
-
-        if (records != null && !records.isEmpty()) {
-            firstOffset = records.get(0).offset();
-            lastOffset = records.get(records.size() - 1).offset();
-        } else {
-            if (lastBatch != null) {
-                firstOffset = lastBatch.firstOffset;
-                lastOffset = lastBatch.lastOffset;
-            }
-        }
-        LOG.debug("Created {}", this);
+        Validate.isTrue(!records.isEmpty(), "There must be at least one record in order to build metadata");
+        
+        this.topicPartition = topicPartition;
+        firstOffset = records.get(0).offset();
+        lastOffset = records.get(records.size() - 1).offset();
+        LOG.debug("Created {}", this.toString());
     }
 
     public long getFirstOffset() {
@@ -74,9 +86,32 @@ public class KafkaTridentSpoutBatchMetadata<K,V> implements Serializable {
     public TopicPartition getTopicPartition() {
         return topicPartition;
     }
+    
+    /**
+     * Constructs a metadata object from a Map in the format produced by {@link #toMap() }.
+     * @param map The source map
+     * @return A new metadata object
+     */
+    public static KafkaTridentSpoutBatchMetadata fromMap(Map<String, Object> map) {
+        Map<String, Object> topicPartitionMap = (Map<String, Object>)map.get(TOPIC_PARTITION_KEY);
+        TopicPartition tp = TP_SERIALIZER.fromMap(topicPartitionMap);
+        return new KafkaTridentSpoutBatchMetadata(tp, ((Number)map.get(FIRST_OFFSET_KEY)).longValue(),
+            ((Number)map.get(LAST_OFFSET_KEY)).longValue());
+    }
+    
+    /**
+     * Writes this metadata object to a Map so Trident can read/write it to Zookeeper.
+     */
+    public Map<String, Object> toMap() {
+        Map<String, Object> map = new HashMap<>();
+        map.put(TOPIC_PARTITION_KEY, TP_SERIALIZER.toMap(topicPartition));
+        map.put(FIRST_OFFSET_KEY, firstOffset);
+        map.put(LAST_OFFSET_KEY, lastOffset);
+        return map;
+    }
 
     @Override
-    public String toString() {
+    public final String toString() {
         return super.toString()
                 + "{topicPartition=" + topicPartition
                 + ", firstOffset=" + firstOffset

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 5351f79..a45eff8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -27,14 +27,16 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.RecordTranslator;
@@ -47,9 +49,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
-        List<TopicPartition>,
+        List<Map<String, Object>>,
         KafkaTridentSpoutTopicPartition,
-        KafkaTridentSpoutBatchMetadata<K, V>>,
+        Map<String, Object>>,
         Serializable {
 
     private static final long serialVersionUID = -7343927794834130435L;
@@ -60,16 +62,24 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
 
     // Bookkeeping
     private final KafkaTridentSpoutManager<K, V> kafkaManager;
-    private Set<TopicPartition> firstPoll = new HashSet<>();        // set of topic-partitions for which first poll has already occurred
+    // set of topic-partitions for which first poll has already occurred, and the first polled txid
+    private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); 
 
     // Declare some KafkaTridentSpoutManager references for convenience
     private final long pollTimeoutMs;
     private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final RecordTranslator<K, V> translator;
     private final Timer refreshSubscriptionTimer;
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
 
     private TopologyContext topologyContext;
 
+    /**
+     * Create a new Kafka spout emitter.
+     * @param kafkaManager The Kafka consumer manager to use
+     * @param topologyContext The topology context
+     * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription
+     */
     public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext,
             Timer refreshSubscriptionTimer) {
         this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
@@ -81,7 +91,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
         final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
         this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
         this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
-        LOG.debug("Created {}", this);
+        LOG.debug("Created {}", this.toString());
     }
 
     /**
@@ -93,15 +103,16 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
     }
 
     @Override
-    public KafkaTridentSpoutBatchMetadata<K, V> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
-            KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K, V> lastBatch) {
+    public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
+            KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
 
         LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
                 tx, currBatchPartition, lastBatch, collector);
 
         final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
         final Set<TopicPartition> assignments = kafkaConsumer.assignment();
-        KafkaTridentSpoutBatchMetadata<K, V> currentBatch = lastBatch;
+        KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
+        KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
         Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
 
         if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
@@ -114,7 +125,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
                 // pause other topic-partitions to only poll from current topic-partition
                 pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
 
-                seek(currBatchTp, lastBatch);
+                seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
 
                 // poll
                 if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
@@ -127,7 +138,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
                 if (!records.isEmpty()) {
                     emitTuples(collector, records);
                     // build new metadata
-                    currentBatch = new KafkaTridentSpoutBatchMetadata<>(currBatchTp, records, lastBatch);
+                    currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
                 }
             } finally {
                 kafkaConsumer.resume(pausedTopicPartitions);
@@ -137,7 +148,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
                     + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
         }
 
-        return currentBatch;
+        return currentBatch == null ? null : currentBatch.toMap();
     }
 
     private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
@@ -149,46 +160,49 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
     }
 
     /**
-     * Determines the offset of the next fetch. For failed batches lastBatchMeta is not null and contains the fetch
-     * offset of the failed batch. In this scenario the next fetch will take place at offset of the failed batch + 1.
-     * When the previous batch is successful, lastBatchMeta is null, and the offset of the next fetch is either the
-     * offset of the last commit to kafka, or if no commit was yet made, the offset dictated by
-     * {@link KafkaSpoutConfig.FirstPollOffsetStrategy}
+     * Determines the offset of the next fetch. Will use the firstPollOffsetStrategy if this is the first poll for the topic partition.
+     * Otherwise the next offset will be one past the last batch, based on lastBatchMeta.
+     * 
+     * <p>lastBatchMeta should only be null when the previous txid was not emitted (e.g. new topic),
+     * it is the first poll for the spout instance, or it is a replay of the first txid this spout emitted on this partition.
+     * In the second case, there are either no previous transactions, or the MBC is still committing them
+     * and they will fail because this spout did not emit the corresponding batches. If it had emitted them, the meta could not be null. 
+     * In any case, the lastBatchMeta should never be null if this is not the first poll for this spout instance.
      *
      * @return the offset of the next fetch
      */
-    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata<K, V> lastBatchMeta) {
-        if (lastBatchMeta != null) {
-            kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
-            LOG.debug("Seeking fetch offset to next offset after last offset from previous batch for topic-partition [{}]", tp);
-        } else if (isFirstPoll(tp)) {
-            LOG.debug("Seeking fetch offset from firstPollOffsetStrategy and last commit to Kafka for topic-partition [{}]", tp);
-            firstPoll.add(tp);
-            final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
-            if (committedOffset != null) {             // offset was committed for this TopicPartition
-                if (firstPollOffsetStrategy.equals(EARLIEST)) {
-                    kafkaConsumer.seekToBeginning(Collections.singleton(tp));
-                } else if (firstPollOffsetStrategy.equals(LATEST)) {
-                    kafkaConsumer.seekToEnd(Collections.singleton(tp));
-                } else {
-                    // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
-                    kafkaConsumer.seek(tp, committedOffset.offset() + 1);
-                }
-            } else {    // no commits have ever been done, so start at the beginning or end depending on the strategy
-                if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
-                    kafkaConsumer.seekToBeginning(Collections.singleton(tp));
-                } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
-                    kafkaConsumer.seekToEnd(Collections.singleton(tp));
-                }
+    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) {
+        if (isFirstPoll(tp, transactionId)) {
+            if (firstPollOffsetStrategy == EARLIEST) {
+                LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
+                kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+            } else if (firstPollOffsetStrategy == LATEST) {
+                LOG.debug("First poll for topic partition [{}], seeking to partition end", tp);
+                kafkaConsumer.seekToEnd(Collections.singleton(tp));
+            } else if (lastBatchMeta != null) {
+                LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
+                kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
+            } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
+                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp);
+                kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+            } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
+                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp);
+                kafkaConsumer.seekToEnd(Collections.singleton(tp));
             }
+            firstPollTransaction.put(tp, transactionId);
+        } else {
+            kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
+            LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
         }
+
         final long fetchOffset = kafkaConsumer.position(tp);
         LOG.debug("Set [fetchOffset = {}]", fetchOffset);
         return fetchOffset;
     }
 
-    private boolean isFirstPoll(TopicPartition tp) {
-        return !firstPoll.contains(tp);
+    private boolean isFirstPoll(TopicPartition tp, long txid) {
+        // The first poll is either the "real" first transaction, or a replay of the first transaction
+        return !firstPollTransaction.containsKey(tp) || firstPollTransaction.get(tp) == txid;
     }
 
     // returns paused topic-partitions.
@@ -215,15 +229,19 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
      * @return ordered list of topic partitions for this task
      */
     @Override
-    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<TopicPartition> allPartitionInfo) {
-        final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allPartitionInfo);
+    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
+        List<TopicPartition> allTopicPartitions = allPartitionInfo.stream()
+            .map(map -> tpSerializer.fromMap(map))
+            .collect(Collectors.toList());
+        final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
         LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
                 allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
         return allPartitions;
     }
 
     @Override
-    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks, List<TopicPartition> allPartitionInfo) {
+    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
+        List<Map<String, Object>> allPartitionInfo) {
         final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
         LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
         final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
@@ -253,7 +271,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
     }
 
     @Override
-    public String toString() {
+    public final String toString() {
         return super.toString()
                 + "{kafkaManager=" + kafkaManager
                 + '}';

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
index 30d52cb..b5138c2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -42,10 +42,14 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
     // Declare some KafkaSpoutConfig references for convenience
     private Fields fields;
 
+    /**
+     * Create a KafkaConsumer manager for the trident spout.
+     * @param kafkaSpoutConfig The consumer config
+     */
     public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;
         this.fields = getFields();
-        LOG.debug("Created {}", this);
+        LOG.debug("Created {}", this.toString());
     }
 
     KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext context) {
@@ -63,7 +67,7 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
         return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
     }
 
-    Fields getFields() {
+    final Fields getFields() {
         if (fields == null) {
             RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
             Fields fs = null;
@@ -87,7 +91,7 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
     }
 
     @Override
-    public String toString() {
+    public final String toString() {
         return super.toString()
                 + "{kafkaConsumer=" + kafkaConsumer
                 + ", kafkaSpoutConfig=" + kafkaSpoutConfig

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index 4f49c7d..8d33e39 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -28,8 +28,8 @@ import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<TopicPartition>,
-        KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> {
+public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
+        KafkaTridentSpoutTopicPartition, Map<String, Object>> {
     private static final long serialVersionUID = -8003272486566259640L;
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
@@ -42,17 +42,17 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
     
     public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
         this.kafkaManager = kafkaManager;
-        LOG.debug("Created {}", this);
+        LOG.debug("Created {}", this.toString());
     }
 
     @Override
-    public Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> getEmitter(
+    public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
             Map<String, Object> conf, TopologyContext context) {
         return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
     }
 
     @Override
-    public Coordinator<List<TopicPartition>> getCoordinator(Map<String, Object> conf, TopologyContext context) {
+    public Coordinator<List<Map<String, Object>>> getCoordinator(Map<String, Object> conf, TopologyContext context) {
         return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
     }
 
@@ -69,7 +69,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
     }
 
     @Override
-    public String toString() {
+    public final String toString() {
         return super.toString()
                 + "{kafkaManager=" + kafkaManager + '}';
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
index c61cdcd..17732c2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
@@ -21,20 +21,23 @@ package org.apache.storm.kafka.spout.trident;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<TopicPartition>>,
+public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
         Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);
 
-    private KafkaTridentSpoutManager<K,V> kafkaManager;
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+    private final KafkaTridentSpoutManager<K,V> kafkaManager;
 
     public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
         this.kafkaManager = kafkaManager;
-        LOG.debug("Created {}", this);
+        LOG.debug("Created {}", this.toString());
     }
 
     @Override
@@ -44,10 +47,12 @@ public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartition
     }
 
     @Override
-    public List<TopicPartition> getPartitionsForBatch() {
+    public List<Map<String, Object>> getPartitionsForBatch() {
         final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
         LOG.debug("TopicPartitions for batch {}", topicPartitions);
-        return topicPartitions;
+        return topicPartitions.stream()
+            .map(tp -> tpSerializer.toMap(tp))
+            .collect(Collectors.toList());
     }
 
     @Override
@@ -56,7 +61,7 @@ public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartition
     }
 
     @Override
-    public String toString() {
+    public final String toString() {
         return super.toString()
                 + "{kafkaManager=" + kafkaManager
                 + '}';

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
new file mode 100644
index 0000000..50e78f0
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+
+public class TopicPartitionSerializer {
+
+    public static final String TOPIC_PARTITION_TOPIC_KEY = "topic";
+    public static final String TOPIC_PARTITION_PARTITION_KEY = "partition";
+
+    /**
+     * Serializes the given TopicPartition to Map so Trident can serialize it to JSON.
+     */
+    public Map<String, Object> toMap(TopicPartition topicPartition) {
+        Map<String, Object> topicPartitionMap = new HashMap<>();
+        topicPartitionMap.put(TOPIC_PARTITION_TOPIC_KEY, topicPartition.topic());
+        topicPartitionMap.put(TOPIC_PARTITION_PARTITION_KEY, topicPartition.partition());
+        return topicPartitionMap;
+    }
+
+    /**
+     * Deserializes the given map into a TopicPartition. The map keys are expected to be those produced by
+     * {@link #toMap(org.apache.kafka.common.TopicPartition)}.
+     */
+    public TopicPartition fromMap(Map<String, Object> map) {
+        return new TopicPartition((String) map.get(TOPIC_PARTITION_TOPIC_KEY),
+            ((Number) map.get(TOPIC_PARTITION_PARTITION_KEY)).intValue());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
index f967609..c221f9e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -66,12 +66,21 @@ public class TridentKafkaState implements State {
         LOG.debug("commit is Noop.");
     }
 
+    /**
+     * Prepare this State.
+     * @param options The KafkaProducer config.
+     */
     public void prepare(Properties options) {
         Objects.requireNonNull(mapper, "mapper can not be null");
         Objects.requireNonNull(topicSelector, "topicSelector can not be null");
         producer = new KafkaProducer(options);
     }
 
+    /**
+     * Write the given tuples to Kafka.
+     * @param tuples The tuples to write.
+     * @param collector Tbe Trident collector.
+     */
     public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
         String topic = null;
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 513db90..051d212 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -44,7 +44,7 @@ public class KafkaSpoutConfigTest {
     }
 
     @Test
-    public void test_setEmitNullTuples_true_true() {
+    public void testSetEmitNullTuplesToTrue() {
         final KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
                 .setEmitNullTuples(true)
                 .build();

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
index aa65d0f..a5d3c54 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -20,11 +20,13 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -33,9 +35,10 @@ import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 
 public class SpoutWithMockedConsumerSetupHelper {
-    
+
     /**
      * Creates, opens and activates a KafkaSpout using a mocked consumer.
+     *
      * @param <K> The Kafka key type
      * @param <V> The Kafka value type
      * @param spoutConfig The spout config to use
@@ -47,27 +50,45 @@ public class SpoutWithMockedConsumerSetupHelper {
      * @return The spout
      */
     public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf,
-        TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {     
+        TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {
 
         Map<String, List<PartitionInfo>> partitionInfos = assignedPartitions.stream()
             .map(tp -> new PartitionInfo(tp.topic(), tp.partition(), null, null, null))
             .collect(Collectors.groupingBy(info -> info.topic()));
         partitionInfos.keySet()
             .forEach(key -> when(consumerMock.partitionsFor(key))
-                .thenReturn(partitionInfos.get(key)));
+            .thenReturn(partitionInfos.get(key)));
         KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
 
         KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory);
 
         when(contextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
         when(contextMock.getThisTaskIndex()).thenReturn(0);
-        
+
         spout.open(topoConf, contextMock, collectorMock);
         spout.activate();
 
         verify(consumerMock).assign(assignedPartitions);
-        
+
         return spout;
     }
-    
+
+    /**
+     * Creates sequential dummy records
+     *
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param topic The topic partition to create records for
+     * @param startingOffset The starting offset of the records
+     * @param numRecords The number of records to create
+     * @return The dummy records
+     */
+    public static <K, V> List<ConsumerRecord<K, V>> createRecords(TopicPartition topic, long startingOffset, int numRecords) {
+        List<ConsumerRecord<K, V>> recordsForPartition = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            recordsForPartition.add(new ConsumerRecord<>(topic.topic(), topic.partition(), startingOffset + i, null, null));
+        }
+        return recordsForPartition;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
new file mode 100644
index 0000000..a5c78a8
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
+import org.json.simple.JSONValue;
+import org.junit.Test;
+
+public class KafkaTridentSpoutBatchMetadataTest {
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testMetadataIsRoundTripSerializableWithJsonSimple() throws Exception {
+        /**
+         * Tests that the metadata object can be converted to and from a Map. This is needed because Trident metadata is written to
+         * Zookeeper as JSON with the json-simple library, so the spout converts the metadata to Map before returning it to Trident.
+         * It is important that all map entries are types json-simple knows about,
+         * since otherwise the library just calls toString on them which will likely produce invalid JSON.
+         */
+        TopicPartition tp = new TopicPartition("topic", 0);
+        long startOffset = 10;
+        long endOffset = 20;
+
+        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, startOffset, endOffset);
+        Map<String, Object> map = metadata.toMap();
+        Map deserializedMap = (Map)JSONValue.parseWithException(JSONValue.toJSONString(map));
+        KafkaTridentSpoutBatchMetadata deserializedMetadata = KafkaTridentSpoutBatchMetadata.fromMap(deserializedMap);
+        assertThat(deserializedMetadata.getTopicPartition(), is(metadata.getTopicPartition()));
+        assertThat(deserializedMetadata.getFirstOffset(), is(metadata.getFirstOffset()));
+        assertThat(deserializedMetadata.getLastOffset(), is(metadata.getLastOffset()));
+    }
+
+    @Test
+    public void testCreateMetadataFromRecords() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+        long firstOffset = 15;
+        long lastOffset = 55;
+        ConsumerRecords<?, ?> records = new ConsumerRecords<>(Collections.singletonMap(tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, (int) (lastOffset - firstOffset + 1))));
+
+        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, records);
+        assertThat("The first offset should be the first offset in the record set", metadata.getFirstOffset(), is(firstOffset));
+        assertThat("The last offset should be the last offset in the record set", metadata.getLastOffset(), is(lastOffset));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index 789b615..5fe3c65 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -30,12 +30,32 @@ import java.util.Map;
  * This defines a transactional spout which does *not* necessarily
  * replay the same batch every time it emits a batch for a transaction id.
  * 
+ * @param <M> The type of metadata object passed to the Emitter when emitting a new batch based on a previous batch. This type must be JSON
+ * serializable by json-simple.
+ * @param <Partitions> The type of metadata object used by the coordinator to describe partitions. This type must be JSON serializable by
+ * json-simple.
  */
 public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, M>
     extends ITridentDataSource {
 
+    /**
+     * Coordinator for batches. Trident will only begin committing once at least one coordinator is ready.
+     * 
+     * @param <Partitions> The type of metadata object used by the coordinator to describe partitions. This type must be JSON serializable
+     * by json-simple.
+     */
     interface Coordinator<Partitions> {
+        /**
+         * Indicates whether this coordinator is ready to commit the given transaction.
+         * The master batch coordinator will only begin committing if at least one coordinator indicates it is ready to commit.
+         * @param txid The transaction id
+         * @return true if this coordinator is ready to commit, false otherwise.
+         */
         boolean isReady(long txid);
+        /**
+         * Gets the partitions for the following batches. The emitter will be asked to refresh partitions when this value changes.
+         * @return The partitions for the following batches.
+         */
         Partitions getPartitionsForBatch();
         void close();
     }
@@ -79,7 +99,7 @@ public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends IS
     
     Emitter<Partitions, Partition, M> getEmitter(Map<String, Object> conf, TopologyContext context);
 
-    Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context);
+    Coordinator<Partitions> getCoordinator(Map<String, Object> conf, TopologyContext context);
 
     Map<String, Object> getComponentConfiguration();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/54a829ce/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index ce81334..bb05450 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.json.simple.parser.ParseException;
 
 /**
  * Class that contains the logic to extract the transactional state info from zookeeper. All transactional state
@@ -170,9 +171,14 @@ public class TransactionalState {
         try {
             Object data;
             if(_curator.checkExists().forPath(path)!=null) {
-                // intentionally using parse() instead of parseWithException() to handle error cases as null
-                // this have been used from the start of Trident so we could treat it as safer way
-                data = JSONValue.parse(new String(_curator.getData().forPath(path), "UTF-8"));
+                // Use parseWithException instead of parse so we can capture deserialization errors in the log.
+                // They are likely to be bugs in the spout code.
+                try{
+                    data = JSONValue.parseWithException(new String(_curator.getData().forPath(path), "UTF-8"));
+                } catch (ParseException e) {
+                    LOG.warn("Failed to deserialize zookeeper data for path {}", path, e);
+                    data = null;
+                }
             } else {
                 data = null;
             }


[2/2] storm git commit: Merge branch 'STORM-2675' of https://github.com/srdo/storm into STORM-2675

Posted by bo...@apache.org.
Merge branch 'STORM-2675' of https://github.com/srdo/storm into STORM-2675

STORM-2675: Fix storm-kafka-client Trident spout failing to serialize
meta objects to Zookeeper

This closes #2271


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ea0e465a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ea0e465a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ea0e465a

Branch: refs/heads/master
Commit: ea0e465ab863668f0b04aba7d01050aeb4b13d22
Parents: 306f399 54a829c
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Sep 8 08:56:38 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 8 08:56:38 2017 -0500

----------------------------------------------------------------------
 examples/storm-kafka-client-examples/pom.xml    |  11 +-
 examples/storm-kafka-examples/pom.xml           |  11 +-
 external/storm-kafka-client/pom.xml             |   2 +-
 .../kafka/spout/subscription/Subscription.java  |   5 +-
 .../trident/KafkaTridentSpoutBatchMetadata.java |  77 +++++++++----
 .../spout/trident/KafkaTridentSpoutEmitter.java | 108 +++++++++++--------
 .../spout/trident/KafkaTridentSpoutManager.java |  10 +-
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  12 +--
 .../KafkaTridentSpoutOpaqueCoordinator.java     |  17 +--
 .../spout/trident/TopicPartitionSerializer.java |  47 ++++++++
 .../storm/kafka/trident/TridentKafkaState.java  |   9 ++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |   2 +-
 .../SpoutWithMockedConsumerSetupHelper.java     |  33 ++++--
 .../KafkaTridentSpoutBatchMetadataTest.java     |  66 ++++++++++++
 .../spout/IOpaquePartitionedTridentSpout.java   |  22 +++-
 .../topology/state/TransactionalState.java      |  12 ++-
 16 files changed, 334 insertions(+), 110 deletions(-)
----------------------------------------------------------------------