You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/12/14 15:56:29 UTC

[1/2] storm git commit: STORM-2990, STORM-3279: Fix issue where Kafka Trident spout could ignore EARLIEST and LATEST, and make EARLIEST and LATEST only take effect on topology deploy

Repository: storm
Updated Branches:
  refs/heads/master 43859e9e8 -> e46a87a86


STORM-2990, STORM-3279: Fix issue where Kafka Trident spout could ignore EARLIEST and LATEST, and make EARLIEST and LATEST only take effect on topology deploy


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/642ae369
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/642ae369
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/642ae369

Branch: refs/heads/master
Commit: 642ae3698ce4cc6186b7a694a0f523f5a773fa2e
Parents: 6d871a7
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Nov 14 16:17:03 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Thu Nov 29 19:36:08 2018 +0100

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |   8 +
 .../trident/KafkaTridentSpoutBatchMetadata.java |  13 -
 .../spout/trident/KafkaTridentSpoutEmitter.java |  43 ++-
 .../java/org/apache/storm/kafka/KafkaUnit.java  |  49 ++-
 .../kafka/spout/KafkaSpoutAbstractTest.java     |   1 -
 .../spout/KafkaSpoutMessagingGuaranteeTest.java |   2 +-
 .../KafkaTridentSpoutBatchMetadataTest.java     |  17 -
 .../KafkaTridentSpoutEmitterEmitTest.java       | 306 ++++++++++++++++++
 ...afkaTridentSpoutEmitterPartitioningTest.java | 142 ++++++++
 .../trident/KafkaTridentSpoutEmitterTest.java   | 320 -------------------
 pom.xml                                         |   2 +-
 11 files changed, 508 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index aa72396..b741c83 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -96,6 +96,10 @@
             <artifactId>java-hamcrest</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>log4j-over-slf4j</artifactId>
             <scope>test</scope>
@@ -133,6 +137,10 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
index f0d44b5..da37653 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -55,19 +55,6 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
         this.firstOffset = firstOffset;
         this.lastOffset = lastOffset;
         this.topologyId = topologyId;
-    }
-
-    /**
-     * Builds a metadata object from a non-empty set of records.
-     *
-     * @param consumerRecords The non-empty set of records.
-     */
-    public <K, V> KafkaTridentSpoutBatchMetadata(List<ConsumerRecord<K, V>> consumerRecords, String topologyId) {
-        Validate.isTrue(!consumerRecords.isEmpty(), "There must be at least one record in order to build metadata");
-
-        firstOffset = consumerRecords.get(0).offset();
-        lastOffset = consumerRecords.get(consumerRecords.size() - 1).offset();
-        this.topologyId = topologyId;
         LOG.debug("Created {}", this.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index ed86136..aee8987 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -121,6 +121,13 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
             pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
 
             long seekOffset = currBatchMeta.getFirstOffset();
+            if (seekOffset < 0 && currBatchMeta.getFirstOffset() == currBatchMeta.getLastOffset()) {
+                LOG.debug("Skipping re-emit of batch with negative starting offset."
+                    + " The spout may set a negative starting offset for an empty batch that occurs at the start of a partition."
+                    + " It is not expected that Trident will replay such an empty batch,"
+                    + " but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context.");
+                return;
+            }
             LOG.debug("Seeking to offset [{}] for topic partition [{}]", seekOffset, currBatchTp);
             consumer.seek(currBatchTp, seekOffset);
 
@@ -170,15 +177,25 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
 
             seek(currBatchTp, lastBatchMeta);
 
-            final ConsumerRecords<K, V> records = consumer.poll(pollTimeoutMs);
-            LOG.debug("Polled [{}] records from Kafka.", records.count());
+            final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp);
+            LOG.debug("Polled [{}] records from Kafka.", records.size());
 
             if (!records.isEmpty()) {
                 for (ConsumerRecord<K, V> record : records) {
                     emitTuple(collector, record);
                 }
-                // build new metadata
-                currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), this.topologyContext.getStormId());
+                // build new metadata based on emitted records
+                currentBatch = new KafkaTridentSpoutBatchMetadata(
+                    records.get(0).offset(),
+                    records.get(records.size() - 1).offset(),
+                    topologyContext.getStormId());
+            } else {
+                //Build new metadata based on the consumer position.
+                //We want the next emit to start at the current consumer position,
+                //so make a meta that indicates that position - 1 is the last emitted offset
+                //This helps us avoid cases like STORM-3279, and simplifies the seek logic.
+                long lastEmittedOffset = consumer.position(currBatchTp) - 1;
+                currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, topologyContext.getStormId());
             }
         } finally {
             consumer.resume(pausedTopicPartitions);
@@ -187,7 +204,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
         LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
             + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
 
-        return currentBatch == null ? null : currentBatch.toMap();
+        return currentBatch.toMap();
     }
 
     private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() {
@@ -218,17 +235,18 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
      * <ul>
      * <li>This is the first batch for this partition</li>
      * <li>This is a replay of the first batch for this partition</li>
-     * <li>This is batch n for this partition, where batch 0...n-1 were all empty</li>
      * </ul>
      *
      * @return the offset of the next fetch
      */
     private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
-        if (isFirstPoll(tp)) {
-            if (firstPollOffsetStrategy == EARLIEST) {
+        if (isFirstPollSinceExecutorStarted(tp)) {
+            boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null 
+                || !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
+            if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) {
                 LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
                 consumer.seekToBeginning(Collections.singleton(tp));
-            } else if (firstPollOffsetStrategy == LATEST) {
+            } else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed) {
                 LOG.debug("First poll for topic partition [{}], seeking to partition end", tp);
                 consumer.seekToEnd(Collections.singleton(tp));
             } else if (lastBatchMeta != null) {
@@ -247,9 +265,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
             LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
         } else {
             /*
-             * Last batch meta is null, but this is not the first batch emitted for this partition by this emitter instance. This is either
-             * a replay of the first batch for this partition, or all previous batches were empty, otherwise last batch meta could not be
-             * null. Use the offset the consumer started at.
+             * Last batch meta is null, but this is not the first batch emitted for this partition by this emitter instance. This is
+             * a replay of the first batch for this partition. Use the offset the consumer started at.
              */
             long initialFetchOffset = tpToFirstSeekOffset.get(tp);
             consumer.seek(tp, initialFetchOffset);
@@ -262,7 +279,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
         return fetchOffset;
     }
 
-    private boolean isFirstPoll(TopicPartition tp) {
+    private boolean isFirstPollSinceExecutorStarted(TopicPartition tp) {
         return !tpToFirstSeekOffset.containsKey(tp);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
index 3f212cc..97d358d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -21,72 +21,69 @@ import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CON
 import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
 
-import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.MockTime;
 import kafka.utils.TestUtils;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.storm.testing.TmpPath;
 
 public class KafkaUnit {
+    private TestingServer zookeeper;
     private KafkaServer kafkaServer;
-    private EmbeddedZookeeper zkServer;
-    private ZkUtils zkUtils;
     private KafkaProducer<String, String> producer;
+    private AdminClient kafkaAdminClient;
     private TmpPath kafkaDir;
-    private static final String ZK_HOST = "127.0.0.1";
     private static final String KAFKA_HOST = "127.0.0.1";
     private static final int KAFKA_PORT = 9092;
 
     public KafkaUnit() {
     }
 
-    public void setUp() throws IOException {
+    public void setUp() throws Exception {
         // setup ZK
-        zkServer = new EmbeddedZookeeper();
-        String zkConnect = ZK_HOST + ":" + zkServer.port();
-        ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
-        zkUtils = ZkUtils.apply(zkClient, false);
+        zookeeper = new TestingServer(true);
 
         // setup Broker
         kafkaDir = new TmpPath(Files.createTempDirectory("kafka-").toAbsolutePath().toString());
         Properties brokerProps = new Properties();
-        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("zookeeper.connect", zookeeper.getConnectString());
         brokerProps.setProperty("broker.id", "0");
         brokerProps.setProperty("log.dirs", kafkaDir.getPath());
         brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
+        brokerProps.setProperty("offsets.topic.replication.factor", "1");
         KafkaConfig config = new KafkaConfig(brokerProps);
         MockTime mock = new MockTime();
         kafkaServer = TestUtils.createServer(config, mock);
 
         // setup default Producer
         createProducer();
+        kafkaAdminClient = AdminClient.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT));
     }
 
-    public void tearDown() {
+    public void tearDown() throws Exception {
+        kafkaAdminClient.close();
         closeProducer();
         kafkaServer.shutdown();
         kafkaDir.close();
-        zkUtils.close();
-        zkServer.shutdown();
+        zookeeper.close();
     }
 
-    public void createTopic(String topicName) {
-        AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+    public void createTopic(String topicName) throws Exception {
+        kafkaAdminClient.createTopics(Collections.singleton(new NewTopic(topicName, 1, (short)1)))
+            .all()
+            .get(30, TimeUnit.SECONDS);
     }
 
     public int getKafkaPort() {
@@ -101,13 +98,7 @@ public class KafkaUnit {
         producer = new KafkaProducer<>(producerProps);
     }
 
-    public void createProducer(Serializer keySerializer, Serializer valueSerializer) {
-        Properties producerProps = new Properties();
-        producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
-        producer = new KafkaProducer<>(producerProps, keySerializer, valueSerializer);
-    }
-
-    public void sendMessage(ProducerRecord producerRecord) throws InterruptedException, ExecutionException, TimeoutException {
+    public void sendMessage(ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException, TimeoutException {
         producer.send(producerRecord).get(10, TimeUnit.SECONDS);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
index b05f132..d828873 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -36,7 +36,6 @@ import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfigur
 import org.apache.storm.kafka.spout.internal.ConsumerFactory;
 import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
 import org.apache.storm.kafka.spout.subscription.TopicAssigner;
-import org.apache.storm.kafka.spout.subscription.TopicAssigner;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Values;

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 1d2c171..15d2295 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -212,7 +212,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
 
             spout.nextTuple();
 
-            verify(consumerMock, never()).commitSync(argThat(arg -> {
+            verify(consumerMock, never()).commitSync(argThat((Map<TopicPartition, OffsetAndMetadata> arg) -> {
                 return !arg.containsKey(partition);
             }));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
index 0f419be..a03da8a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
@@ -19,11 +19,7 @@ package org.apache.storm.kafka.spout.trident;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.List;
 import java.util.Map;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
 import org.json.simple.JSONValue;
 import org.junit.Test;
 
@@ -51,17 +47,4 @@ public class KafkaTridentSpoutBatchMetadataTest {
         assertThat(deserializedMetadata.getTopologyId(), is(metadata.getTopologyId()));
     }
 
-    @Test
-    public void testCreateMetadataFromRecords() {
-        long firstOffset = 15;
-        long lastOffset = 55;
-        String topologyId = "topologyId";
-        List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(new TopicPartition("test", 0), firstOffset, (int) (lastOffset - firstOffset + 1));
-
-        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(records, topologyId);
-        assertThat("The first offset should be the first offset in the record set", metadata.getFirstOffset(), is(firstOffset));
-        assertThat("The last offset should be the last offset in the record set", metadata.getLastOffset(), is(lastOffset));
-        assertThat(metadata.getTopologyId(), is(topologyId));
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
new file mode 100644
index 0000000..3692087
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaTridentSpoutEmitterEmitTest {
+
+    @Captor
+    public ArgumentCaptor<List<Object>> emitCaptor;
+
+    @Mock
+    public TopologyContext topologyContextMock;
+
+    @Mock
+    public TridentCollector collectorMock = mock(TridentCollector.class);
+
+    private final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
+    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+    private final String topologyId = "topologyId";
+    private final long firstOffsetInKafka = 0;
+    private final int recordsInKafka = 100;
+    private final long lastOffsetInKafka = 99;
+
+    @BeforeEach
+    public void setUp() {
+        when(topologyContextMock.getStormId()).thenReturn(topologyId);
+        consumer.assign(Collections.singleton(partition));
+        consumer.updateBeginningOffsets(Collections.singletonMap(partition, firstOffsetInKafka));
+        consumer.updateEndOffsets(Collections.singletonMap(partition, firstOffsetInKafka + recordsInKafka));
+        List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstOffsetInKafka, recordsInKafka);
+        records.forEach(record -> consumer.addRecord(record));
+    }
+
+    private KafkaTridentSpoutEmitter<String, String> createEmitter(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+        return new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .setRecordTranslator(r -> new Values(r.offset()), new Fields("offset"))
+                .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
+                .setPollTimeoutMs(1)
+                .build(),
+            topologyContextMock,
+            config -> consumer, new TopicAssigner());
+    }
+
+    private Map<String, Object> doEmitNewBatchTest(FirstPollOffsetStrategy firstPollOffsetStrategy, TridentCollector collectorMock, TopicPartition tp, Map<String, Object> previousBatchMeta) {
+        KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);
+
+        TransactionAttempt txid = new TransactionAttempt(10L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
+        return emitter.emitPartitionBatchNew(txid, collectorMock, kttp, previousBatchMeta);
+    }
+
+    @Test
+    public void testEmitNewBatchWithNullMetaUncommittedEarliest() {
+        //Check that null meta makes the spout seek to EARLIEST, and that the returned meta is correct
+        Map<String, Object> batchMeta = doEmitNewBatchTest(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, collectorMock, partition, null);
+
+        verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
+        List<List<Object>> emits = emitCaptor.getAllValues();
+        assertThat(emits.get(0).get(0), is(firstOffsetInKafka));
+        assertThat(emits.get(emits.size() - 1).get(0), is(lastOffsetInKafka));
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffsetInKafka));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+    }
+
+    @Test
+    public void testEmitNewBatchWithNullMetaUncommittedLatest() {
+        //Check that null meta makes the spout seek to LATEST, and that the returned meta is correct
+        Map<String, Object> batchMeta = doEmitNewBatchTest(FirstPollOffsetStrategy.UNCOMMITTED_LATEST, collectorMock, partition, null);
+
+        verify(collectorMock, never()).emit(anyList());
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(lastOffsetInKafka));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST"})
+    public void testEmitNewBatchWithPreviousMeta(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+        //Check that non-null meta makes the spout seek according to the provided metadata, and that the returned meta is correct
+        long firstExpectedEmittedOffset = 50;
+        int expectedEmittedRecords = 50;
+        KafkaTridentSpoutBatchMetadata previousBatchMeta = new KafkaTridentSpoutBatchMetadata(firstOffsetInKafka, firstExpectedEmittedOffset - 1, topologyId);
+        Map<String, Object> batchMeta = doEmitNewBatchTest(firstPollOffsetStrategy, collectorMock, partition, previousBatchMeta.toMap());
+
+        verify(collectorMock, times(expectedEmittedRecords)).emit(emitCaptor.capture());
+        List<List<Object>> emits = emitCaptor.getAllValues();
+        assertThat(emits.get(0).get(0), is(firstExpectedEmittedOffset));
+        assertThat(emits.get(emits.size() - 1).get(0), is(lastOffsetInKafka));
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstExpectedEmittedOffset));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+    }
+
+    @Test
+    public void testEmitEmptyBatches() throws Exception {
+        //Check that the emitter can handle emitting empty batches on a new partition.
+        //If the spout is configured to seek to LATEST, or the partition is empty, the initial batches may be empty
+        KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+        Map<String, Object> lastBatchMeta = null;
+        //Emit 10 empty batches, simulating no new records being present in Kafka
+        for (int i = 0; i < 10; i++) {
+            TransactionAttempt txid = new TransactionAttempt((long) i, 0);
+            lastBatchMeta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, lastBatchMeta);
+            KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
+            assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getFirstOffset(), is(lastOffsetInKafka));
+            assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+        }
+        //Add new records to Kafka, and check that the next batch contains these records
+        long firstNewRecordOffset = lastOffsetInKafka + 1;
+        int numNewRecords = 10;
+        List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
+        newRecords.forEach(consumer::addRecord);
+        lastBatchMeta = emitter.emitPartitionBatchNew(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta);
+
+        verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
+        List<List<Object>> emits = emitCaptor.getAllValues();
+        assertThat(emits.get(0).get(0), is(firstNewRecordOffset));
+        assertThat(emits.get(emits.size() - 1).get(0), is(firstNewRecordOffset + numNewRecords - 1));
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstNewRecordOffset));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstNewRecordOffset + numNewRecords - 1));
+    }
+
+    @Test
+    public void testReEmitBatch() {
+        //Check that a reemit emits exactly the same tuples as the last batch, even if Kafka returns more messages
+        long firstEmittedOffset = 50;
+        int numEmittedRecords = 10;
+        KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstEmittedOffset, firstEmittedOffset + numEmittedRecords - 1, topologyId);
+        KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+        TransactionAttempt txid = new TransactionAttempt(10L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+        emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap());
+
+        verify(collectorMock, times(numEmittedRecords)).emit(emitCaptor.capture());
+        List<List<Object>> emits = emitCaptor.getAllValues();
+        assertThat(emits.get(0).get(0), is(firstEmittedOffset));
+        assertThat(emits.get(emits.size() - 1).get(0), is(firstEmittedOffset + numEmittedRecords - 1));
+    }
+
+    @Test
+    public void testReEmitBatchForOldTopologyWhenIgnoringCommittedOffsets() {
+        //In some cases users will want to drop retrying old batches, e.g. if the topology should start over from scratch.
+        //If the FirstPollOffsetStrategy ignores committed offsets, we should not retry batches for old topologies
+        //The batch retry should be skipped entirely
+        KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstOffsetInKafka, lastOffsetInKafka, "a new storm id");
+        KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.EARLIEST);
+        TransactionAttempt txid = new TransactionAttempt(10L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+        emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap());
+
+        verify(collectorMock, never()).emit(anyList());
+    }
+
+    @Test
+    public void testEmitEmptyFirstBatch() {
+        /**
+         * Check that when the first batch after a redeploy is empty, the emitter does not restart at the pre-redeploy offset. STORM-3279.
+         */
+        long firstEmittedOffset = 50;
+        int emittedRecords = 10;
+        KafkaTridentSpoutBatchMetadata preRedeployLastMeta = new KafkaTridentSpoutBatchMetadata(firstEmittedOffset, firstEmittedOffset + emittedRecords - 1, "an old topology");
+        KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
+        TransactionAttempt txid = new TransactionAttempt(0L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+        Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preRedeployLastMeta.toMap());
+
+        verify(collectorMock, never()).emit(anyList());
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
+        assertThat(deserializedMeta.getFirstOffset(), is(lastOffsetInKafka));
+        assertThat(deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+
+        long firstNewRecordOffset = lastOffsetInKafka + 1;
+        int numNewRecords = 10;
+        List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
+        newRecords.forEach(consumer::addRecord);
+        meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, meta);
+
+        verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
+        List<List<Object>> emits = emitCaptor.getAllValues();
+        assertThat(emits.get(0).get(0), is(firstNewRecordOffset));
+        assertThat(emits.get(emits.size() - 1).get(0), is(firstNewRecordOffset + numNewRecords - 1));
+        deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstNewRecordOffset));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstNewRecordOffset + numNewRecords - 1));
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST"})
+    public void testUnconditionalStrategyWhenSpoutWorkerIsRestarted(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+        /**
+         * EARLIEST/LATEST should act like UNCOMMITTED_EARLIEST/LATEST if the emitter is new but the topology has not restarted (storm id
+         * has not changed)
+         */
+        long preRestartEmittedOffset = 20;
+        int lastBatchEmittedRecords = 10;
+        int preRestartEmittedRecords = 30;
+        KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + lastBatchEmittedRecords - 1, topologyId);
+        KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);
+        TransactionAttempt txid = new TransactionAttempt(0L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+        Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
+
+        long firstEmittedOffset = preRestartEmittedOffset + lastBatchEmittedRecords;
+        int emittedRecords = recordsInKafka - preRestartEmittedRecords;
+        verify(collectorMock, times(emittedRecords)).emit(emitCaptor.capture());
+        List<List<Object>> emits = emitCaptor.getAllValues();
+        assertThat(emits.get(0).get(0), is(firstEmittedOffset));
+        assertThat(emits.get(emits.size() - 1).get(0), is(lastOffsetInKafka));
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstEmittedOffset));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+    }
+
+    @Test
+    public void testEarliestStrategyWhenTopologyIsRedeployed() {
+        /**
+         * EARLIEST should be applied if the emitter is new and the topology has been redeployed (storm id has changed)
+         */
+        long preRestartEmittedOffset = 20;
+        int preRestartEmittedRecords = 10;
+        KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology");
+        KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.EARLIEST);
+        TransactionAttempt txid = new TransactionAttempt(0L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+        Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
+
+        verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
+        List<List<Object>> emits = emitCaptor.getAllValues();
+        assertThat(emits.get(0).get(0), is(firstOffsetInKafka));
+        assertThat(emits.get(emits.size() - 1).get(0), is(lastOffsetInKafka));
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffsetInKafka));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+    }
+
+    @Test
+    public void testLatestStrategyWhenTopologyIsRedeployed() {
+        /**
+         * EARLIEST should be applied if the emitter is new and the topology has been redeployed (storm id has changed)
+         */
+        long preRestartEmittedOffset = 20;
+        int preRestartEmittedRecords = 10;
+        KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology");
+        KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
+        TransactionAttempt txid = new TransactionAttempt(0L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+        Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
+
+        verify(collectorMock, never()).emit(anyList());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterPartitioningTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterPartitioningTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterPartitioningTest.java
new file mode 100644
index 0000000..7eb107d
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterPartitioningTest.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaTridentSpoutEmitterPartitioningTest {
+    
+    @Mock
+    public TopologyContext topologyContextMock;
+
+    private final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+    
+    @Test
+    public void testGetOrderedPartitionsIsConsistent() {
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .build(),
+            topologyContextMock,
+            config -> consumer, new TopicAssigner());
+        
+        Set<TopicPartition> allPartitions = new HashSet<>();
+        int numPartitions = 10;
+        for (int i = 0; i < numPartitions; i++) {
+            allPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+        }
+        List<Map<String, Object>> serializedPartitions = allPartitions.stream()
+            .map(tp -> tpSerializer.toMap(tp))
+            .collect(Collectors.toList());
+        
+        List<KafkaTridentSpoutTopicPartition> orderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
+        assertThat("Should contain all partitions", orderedPartitions.size(), is(allPartitions.size()));
+        Collections.shuffle(serializedPartitions);
+        List<KafkaTridentSpoutTopicPartition> secondGetOrderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
+        assertThat("Ordering must be consistent", secondGetOrderedPartitions, is(orderedPartitions));
+        
+        serializedPartitions.add(tpSerializer.toMap(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, numPartitions)));
+        List<KafkaTridentSpoutTopicPartition> orderedPartitionsWithNewPartition = emitter.getOrderedPartitions(serializedPartitions);
+        orderedPartitionsWithNewPartition.remove(orderedPartitionsWithNewPartition.size() - 1);
+        assertThat("Adding new partitions should not shuffle the existing ordering", orderedPartitionsWithNewPartition, is(orderedPartitions));
+    }
+    
+    @Test
+    public void testGetPartitionsForTask() {
+        //Verify correct wrapping/unwrapping of partition and delegation of partition assignment
+        ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
+        when(partitionerMock.getPartitionsForThisTask(any(), any()))
+            .thenAnswer(invocation -> {
+                List<TopicPartition> partitions = new ArrayList<>(invocation.getArgument(0));
+                partitions.remove(0);
+                return new HashSet<>(partitions);
+            });
+        
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1)
+                .build(),
+            topologyContextMock,
+            config -> consumer, new TopicAssigner());
+        
+        List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+        }
+        List<TopicPartition> unwrappedPartitions = allPartitions.stream()
+            .map(kttp -> kttp.getTopicPartition())
+            .collect(Collectors.toList());
+        
+        List<KafkaTridentSpoutTopicPartition> partitionsForTask = emitter.getPartitionsForTask(0, 2, allPartitions);
+        verify(partitionerMock).getPartitionsForThisTask(eq(unwrappedPartitions), any(TopologyContext.class));
+        allPartitions.remove(0);
+        assertThat("Should have assigned all except the first partition to this task", new HashSet<>(partitionsForTask), is(new HashSet<>(allPartitions)));
+    }
+    
+    @Test
+    public void testAssignPartitions() {
+        //Verify correct unwrapping of partitions and delegation of assignment
+        TopicAssigner assignerMock = mock(TopicAssigner.class);
+        
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .build(),
+            topologyContextMock,
+            config -> consumer, assignerMock);
+        
+        List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+        }
+        Set<TopicPartition> unwrappedPartitions = allPartitions.stream()
+            .map(kttp -> kttp.getTopicPartition())
+            .collect(Collectors.toSet());
+        
+        emitter.refreshPartitions(allPartitions);
+        
+        verify(assignerMock).assignPartitions(eq(consumer), eq(unwrappedPartitions), any(ConsumerRebalanceListener.class));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
deleted file mode 100644
index 059c8c7..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.trident;
-
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
-import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
-import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.ConsumerFactory;
-import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
-import org.apache.storm.kafka.spout.subscription.TopicAssigner;
-import org.apache.storm.kafka.spout.subscription.TopicFilter;
-import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.topology.TransactionAttempt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-
-public class KafkaTridentSpoutEmitterTest {
-    
-    @Rule
-    public MockitoRule mockito = MockitoJUnit.rule();
-    
-    @Captor
-    public ArgumentCaptor<List<Object>> emitCaptor;
-    
-    @Mock
-    public TopologyContext topologyContextMock;
-
-    private final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
-    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
-    private final String topologyId = "topologyId";
-
-    @Before
-    public void setUp() {
-        when(topologyContextMock.getStormId()).thenReturn(topologyId);
-    }
-    
-    @Test
-    public void testGetOrderedPartitionsIsConsistent() {
-        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
-                .build(),
-            topologyContextMock,
-            config -> consumer, new TopicAssigner());
-        
-        Set<TopicPartition> allPartitions = new HashSet<>();
-        int numPartitions = 10;
-        for (int i = 0; i < numPartitions; i++) {
-            allPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
-        }
-        List<Map<String, Object>> serializedPartitions = allPartitions.stream()
-            .map(tp -> tpSerializer.toMap(tp))
-            .collect(Collectors.toList());
-        
-        List<KafkaTridentSpoutTopicPartition> orderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
-        assertThat("Should contain all partitions", orderedPartitions.size(), is(allPartitions.size()));
-        Collections.shuffle(serializedPartitions);
-        List<KafkaTridentSpoutTopicPartition> secondGetOrderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
-        assertThat("Ordering must be consistent", secondGetOrderedPartitions, is(orderedPartitions));
-        
-        serializedPartitions.add(tpSerializer.toMap(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, numPartitions)));
-        List<KafkaTridentSpoutTopicPartition> orderedPartitionsWithNewPartition = emitter.getOrderedPartitions(serializedPartitions);
-        orderedPartitionsWithNewPartition.remove(orderedPartitionsWithNewPartition.size() - 1);
-        assertThat("Adding new partitions should not shuffle the existing ordering", orderedPartitionsWithNewPartition, is(orderedPartitions));
-    }
-    
-    @Test
-    public void testGetPartitionsForTask() {
-        //Verify correct wrapping/unwrapping of partition and delegation of partition assignment
-        ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
-        when(partitionerMock.getPartitionsForThisTask(any(), any()))
-            .thenAnswer(invocation -> {
-                List<TopicPartition> partitions = new ArrayList<>((List<TopicPartition>) invocation.getArguments()[0]);
-                partitions.remove(0);
-                return new HashSet<>(partitions);
-            });
-        
-        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1)
-                .build(),
-            topologyContextMock,
-            config -> consumer, new TopicAssigner());
-        
-        List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
-        }
-        List<TopicPartition> unwrappedPartitions = allPartitions.stream()
-            .map(kttp -> kttp.getTopicPartition())
-            .collect(Collectors.toList());
-        
-        List<KafkaTridentSpoutTopicPartition> partitionsForTask = emitter.getPartitionsForTask(0, 2, allPartitions);
-        verify(partitionerMock).getPartitionsForThisTask(eq(unwrappedPartitions), any(TopologyContext.class));
-        allPartitions.remove(0);
-        assertThat("Should have assigned all except the first partition to this task", new HashSet<>(partitionsForTask), is(new HashSet<>(allPartitions)));
-    }
-    
-    @Test
-    public void testAssignPartitions() {
-        //Verify correct unwrapping of partitions and delegation of assignment
-        TopicAssigner assignerMock = mock(TopicAssigner.class);
-        
-        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
-                .build(),
-            topologyContextMock,
-            config -> consumer, assignerMock);
-        
-        List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
-        }
-        Set<TopicPartition> unwrappedPartitions = allPartitions.stream()
-            .map(kttp -> kttp.getTopicPartition())
-            .collect(Collectors.toSet());
-        
-        emitter.refreshPartitions(allPartitions);
-        
-        verify(assignerMock).assignPartitions(eq(consumer), eq(unwrappedPartitions), any(ConsumerRebalanceListener.class));
-    }
-    
-    private KafkaTridentSpoutEmitter<String, String> createEmitterWithMessages(TopicPartition tp, long firstOffset, int numRecords, FirstPollOffsetStrategy firstPollOffsetStrategy) {
-        consumer.assign(Collections.singleton(tp));
-        //Pretend that the topic offsets start at 0, even if the batch should start with a later offset
-        consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
-        List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords);
-        records.forEach(record -> consumer.addRecord(record));
-        return new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
-                .setRecordTranslator(r -> new Values(r.offset()), new Fields("offset"))
-                .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
-                .build(),
-            topologyContextMock,
-            config -> consumer, new TopicAssigner());
-    }
-    
-    private Map<String, Object> doEmitNewBatchTest(MockConsumer<String, String> consumer, TridentCollector collectorMock, TopicPartition tp, long firstOffset, int numRecords, Map<String, Object> previousBatchMeta) {
-        KafkaTridentSpoutEmitter<String, String> emitter = createEmitterWithMessages(tp, firstOffset, numRecords, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
-        
-        TransactionAttempt txid = new TransactionAttempt(10L, 0);
-        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
-        return emitter.emitPartitionBatchNew(txid, collectorMock, kttp, previousBatchMeta);
-    }
-    
-    @Test
-    public void testEmitNewBatchWithNullMeta() {
-        //Check that null meta makes the spout seek according to FirstPollOffsetStrategy, and that the returned meta is correct
-        TridentCollector collectorMock = mock(TridentCollector.class);
-        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
-        long firstOffset = 0;
-        int numRecords = 10;
-        Map<String, Object> batchMeta = doEmitNewBatchTest(consumer, collectorMock, tp, firstOffset, numRecords, null);
-        
-        verify(collectorMock, times(numRecords)).emit(emitCaptor.capture());
-        List<List<Object>> emits = emitCaptor.getAllValues();
-        assertThat(emits.get(0).get(0), is(firstOffset));
-        assertThat(emits.get(emits.size() - 1).get(0), is(firstOffset + numRecords - 1));
-        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
-        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
-        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
-    }
-    
-    @Test
-    public void testEmitNewBatchWithPreviousMeta() {
-        //Check that non-null meta makes the spout seek according to the provided metadata, and that the returned meta is correct
-        TridentCollector collectorMock = mock(TridentCollector.class);
-        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
-        long firstOffset = 50;
-        int numRecords = 10;
-        KafkaTridentSpoutBatchMetadata previousBatchMeta = new KafkaTridentSpoutBatchMetadata(0, firstOffset - 1, topologyId);
-        Map<String, Object> batchMeta = doEmitNewBatchTest(consumer, collectorMock, tp, firstOffset, numRecords, previousBatchMeta.toMap());
-        
-        verify(collectorMock, times(numRecords)).emit(emitCaptor.capture());
-        List<List<Object>> emits = emitCaptor.getAllValues();
-        assertThat(emits.get(0).get(0), is(firstOffset));
-        assertThat(emits.get(emits.size() - 1).get(0), is(firstOffset + numRecords - 1));
-        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
-        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
-        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
-    }
-    
-    @Test
-    public void testEmitEmptyBatches() throws Exception {
-        //Check that the emitter can handle emitting empty batches on a new partition.
-        //If the spout is configured to seek to LATEST, or the partition is empty, the initial batches may be empty
-        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
-        TridentCollector collectorMock = mock(TridentCollector.class);
-        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
-        when(consumerMock.assignment()).thenReturn(Collections.singleton(tp));
-        ConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
-        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
-                .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
-                .build(),
-            mock(TopologyContext.class),
-            consumerFactory, new TopicAssigner());
-        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
-        Map<String, Object> lastBatchMeta = null;
-        //Emit 10 empty batches, simulating no new records being present in Kafka
-        for(int i = 0; i < 10; i++) {
-            clearInvocations(consumerMock);
-            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
-            TransactionAttempt txid = new TransactionAttempt((long) i, 0);
-            lastBatchMeta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, lastBatchMeta);
-            assertThat(lastBatchMeta, nullValue());
-            if (i == 0) {
-                InOrder inOrder = inOrder(consumerMock, collectorMock);
-                inOrder.verify(consumerMock).seekToEnd(Collections.singleton(tp));
-                inOrder.verify(consumerMock).poll(anyLong());
-            } else {
-                verify(consumerMock).poll(anyLong());
-            }
-        }
-        clearInvocations(consumerMock);
-        //Simulate that new records were added in Kafka, and check that the next batch contains these records
-        long firstOffset = 0;
-        int numRecords = 10;
-        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(
-            tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords))));
-        lastBatchMeta = emitter.emitPartitionBatchNew(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta);
-        
-        verify(consumerMock).poll(anyLong());
-        verify(collectorMock, times(numRecords)).emit(anyList());
-        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
-        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
-        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
-    }
-
-    @Test
-    public void testReEmitBatch() {
-        TridentCollector collectorMock = mock(TridentCollector.class);
-        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
-        long firstOffset = 50;
-        int numRecordsEmitted = 10;
-        //Make sure the consumer can return extra records, so we test that re-emit only emits the original messages
-        int numRecordsPresent = 100;
-        KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstOffset, firstOffset + numRecordsEmitted - 1, topologyId);
-        KafkaTridentSpoutEmitter<String, String> emitter = createEmitterWithMessages(tp, firstOffset, numRecordsPresent, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
-        TransactionAttempt txid = new TransactionAttempt(10L, 0);
-        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
-        emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap());
-        
-        verify(collectorMock, times(numRecordsEmitted)).emit(emitCaptor.capture());
-        List<List<Object>> emits = emitCaptor.getAllValues();
-        assertThat(emits.get(0).get(0), is(firstOffset));
-        assertThat(emits.get(emits.size() - 1).get(0), is(firstOffset + numRecordsEmitted - 1));
-    }
-    
-    @Test
-    public void testReEmitBatchForOldTopologyWhenIgnoringCommittedOffsets() {
-        //In some cases users will want to drop retrying old batches, e.g. if the topology should start over from scratch.
-        //If the FirstPollOffsetStrategy ignores committed offsets, we should not retry batches for old topologies
-        //The batch retry should be skipped entirely
-        TridentCollector collectorMock = mock(TridentCollector.class);
-        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
-        long firstOffset = 50;
-        int numRecordsEmitted = 10;
-        KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstOffset, firstOffset + numRecordsEmitted - 1, "a new storm id");
-        KafkaTridentSpoutEmitter<String, String> emitter = createEmitterWithMessages(tp, firstOffset, numRecordsEmitted, FirstPollOffsetStrategy.EARLIEST);
-        TransactionAttempt txid = new TransactionAttempt(10L, 0);
-        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
-        emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap());
-        
-        verify(collectorMock, never()).emit(anyList());
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9836fab..2bebb01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -303,7 +303,7 @@
 
         <jackson.version>2.9.4</jackson.version>
         
-        <storm.kafka.client.version>0.10.1.0</storm.kafka.client.version>
+        <storm.kafka.client.version>0.11.0.0</storm.kafka.client.version>
 
         <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
         <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest, org.apache.storm.testing.PerformanceTest</java.unit.test.exclude>


[2/2] storm git commit: Merge branch 'STORM-2990' of https://github.com/srdo/storm into asfgit-master

Posted by sr...@apache.org.
Merge branch 'STORM-2990' of https://github.com/srdo/storm into asfgit-master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e46a87a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e46a87a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e46a87a8

Branch: refs/heads/master
Commit: e46a87a862f4d9fc76358f76f9fb4aad5264b080
Parents: 43859e9 642ae36
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Dec 14 16:55:35 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Dec 14 16:55:35 2018 +0100

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |   8 +
 .../trident/KafkaTridentSpoutBatchMetadata.java |  13 -
 .../spout/trident/KafkaTridentSpoutEmitter.java |  43 ++-
 .../java/org/apache/storm/kafka/KafkaUnit.java  |  49 ++-
 .../kafka/spout/KafkaSpoutAbstractTest.java     |   1 -
 .../spout/KafkaSpoutMessagingGuaranteeTest.java |   2 +-
 .../KafkaTridentSpoutBatchMetadataTest.java     |  17 -
 .../KafkaTridentSpoutEmitterEmitTest.java       | 306 ++++++++++++++++++
 ...afkaTridentSpoutEmitterPartitioningTest.java | 142 ++++++++
 .../trident/KafkaTridentSpoutEmitterTest.java   | 320 -------------------
 pom.xml                                         |   2 +-
 11 files changed, 508 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e46a87a8/pom.xml
----------------------------------------------------------------------