You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/09 08:57:58 UTC

[1/2] storm git commit: STORM-3046: Ensure KafkaTridentSpoutEmitter handles empty batches correctly when they occur at the beginning of the stream

Repository: storm
Updated Branches:
  refs/heads/master 6872adfc2 -> afe35f318


STORM-3046: Ensure KafkaTridentSpoutEmitter handles empty batches correctly when they occur at the beginning of the stream


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0154216c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0154216c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0154216c

Branch: refs/heads/master
Commit: 0154216c9cb489b92ab86393fa1dd2fa5de56073
Parents: fdab9f9
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Apr 28 13:01:44 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Apr 28 13:01:44 2018 +0200

----------------------------------------------------------------------
 .../spout/trident/KafkaTridentSpoutEmitter.java | 42 ++++++++++-------
 .../trident/KafkaTridentSpoutEmitterTest.java   | 49 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0154216c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 86535be..27e75c2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -65,8 +65,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
     private final TopicAssigner topicAssigner;
     
-    // set of topic-partitions for which first poll has already occurred, and the first polled txid
-    private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); 
+    // The first seek offset for each topic partition, i.e. the offset this spout instance started processing at.
+    private final Map<TopicPartition, Long> tpToFirstSeekOffset = new HashMap<>(); 
 
     private final long pollTimeoutMs;
     private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
@@ -117,7 +117,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
                 // pause other topic-partitions to only poll from current topic-partition
                 pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
 
-                seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
+                seek(currBatchTp, lastBatchMeta);
 
                 final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
                 LOG.debug("Polled [{}] records from Kafka.", records.count());
@@ -149,17 +149,18 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
     /**
      * Determines the offset of the next fetch. Will use the firstPollOffsetStrategy if this is the first poll for the topic partition.
      * Otherwise the next offset will be one past the last batch, based on lastBatchMeta.
-     * 
-     * <p>lastBatchMeta should only be null when the previous txid was not emitted (e.g. new topic),
-     * it is the first poll for the spout instance, or it is a replay of the first txid this spout emitted on this partition.
-     * In the second case, there are either no previous transactions, or the MBC is still committing them
-     * and they will fail because this spout did not emit the corresponding batches. If it had emitted them, the meta could not be null. 
-     * In any case, the lastBatchMeta should never be null if this is not the first poll for this spout instance.
+     *
+     * <p>lastBatchMeta should only be null in the following cases:
+     * <ul>
+     * <li>This is the first batch for this partition</li>
+     * <li>This is a replay of the first batch for this partition</li>
+     * <li>This is batch n for this partition, where batch 0...n-1 were all empty</li>
+     * </ul>
      *
      * @return the offset of the next fetch
      */
-    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) {
-        if (isFirstPoll(tp, transactionId)) {
+    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
+        if (isFirstPoll(tp)) {
             if (firstPollOffsetStrategy == EARLIEST) {
                 LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
                 kafkaConsumer.seekToBeginning(Collections.singleton(tp));
@@ -176,10 +177,20 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
                 LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp);
                 kafkaConsumer.seekToEnd(Collections.singleton(tp));
             }
-            firstPollTransaction.put(tp, transactionId);
-        } else {
+            tpToFirstSeekOffset.put(tp, kafkaConsumer.position(tp));
+        } else if (lastBatchMeta != null) {
             kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
             LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
+        } else {
+            /*
+             * Last batch meta is null, but this is not the first batch emitted for this partition by this emitter instance.
+             * This is either a replay of the first batch for this partition, or all previous batches were empty,
+             * otherwise last batch meta could not be null. Use the offset the consumer started at. 
+             */
+            long initialFetchOffset = tpToFirstSeekOffset.get(tp);
+            kafkaConsumer.seek(tp, initialFetchOffset);
+            LOG.debug("First poll for topic partition [{}], no last batch metadata present."
+                + " Using stored initial fetch offset [{}]", tp, initialFetchOffset);
         }
 
         final long fetchOffset = kafkaConsumer.position(tp);
@@ -187,9 +198,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
         return fetchOffset;
     }
 
-    private boolean isFirstPoll(TopicPartition tp, long txid) {
-        // The first poll is either the "real" first transaction, or a replay of the first transaction
-        return !firstPollTransaction.containsKey(tp) || firstPollTransaction.get(tp) == txid;
+    private boolean isFirstPoll(TopicPartition tp) {
+        return !tpToFirstSeekOffset.containsKey(tp);
     }
 
     // returns paused topic-partitions.

http://git-wip-us.apache.org/repos/asf/storm/blob/0154216c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
index 6208ce4..3be0488 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
@@ -16,12 +16,14 @@
 
 package org.apache.storm.kafka.spout.trident;
 
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -211,4 +213,51 @@ public class KafkaTridentSpoutEmitterTest {
         assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
     }
     
+    @Test
+    public void testEmitEmptyBatches() throws Exception {
+        //Check that the emitter can handle emitting empty batches on a new partition.
+        //If the spout is configured to seek to LATEST, or the partition is empty, the initial batches may be empty
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        TridentCollector collectorMock = mock(TridentCollector.class);
+        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+        when(consumerMock.assignment()).thenReturn(Collections.singleton(tp));
+        KafkaConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
+                .build(),
+            mock(TopologyContext.class),
+            consumerFactory, new TopicAssigner());
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
+        Map<String, Object> lastBatchMeta = null;
+        //Emit 10 empty batches, simulating no new records being present in Kafka
+        for(int i = 0; i < 10; i++) {
+            clearInvocations(consumerMock);
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+            TransactionAttempt txid = new TransactionAttempt((long) i, 0);
+            lastBatchMeta = emitter.emitPartitionBatch(txid, collectorMock, kttp, lastBatchMeta);
+            assertThat(lastBatchMeta, nullValue());
+            if (i == 0) {
+                InOrder inOrder = inOrder(consumerMock, collectorMock);
+                inOrder.verify(consumerMock).seekToEnd(Collections.singleton(tp));
+                inOrder.verify(consumerMock).poll(anyLong());
+            } else {
+                verify(consumerMock).poll(anyLong());
+            }
+        }
+        clearInvocations(consumerMock);
+        //Simulate that new records were added in Kafka, and check that the next batch contains these records
+        long firstOffset = 0;
+        int numRecords = 10;
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(
+            tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords))));
+        lastBatchMeta = emitter.emitPartitionBatch(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta);
+        
+        verify(consumerMock).poll(anyLong());
+        verify(collectorMock, times(numRecords)).emit(anyList());
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
+    }
+    
 }


[2/2] storm git commit: Merge branch 'STORM-3046' of https://github.com/srdo/storm into STORM-3046-merge

Posted by ka...@apache.org.
Merge branch 'STORM-3046' of https://github.com/srdo/storm into STORM-3046-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/afe35f31
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/afe35f31
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/afe35f31

Branch: refs/heads/master
Commit: afe35f318489c86eca26064e3133e89b5591bf1f
Parents: 6872adf 0154216
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jul 9 17:57:48 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jul 9 17:57:48 2018 +0900

----------------------------------------------------------------------
 .../spout/trident/KafkaTridentSpoutEmitter.java | 42 ++++++++++-------
 .../trident/KafkaTridentSpoutEmitterTest.java   | 49 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 16 deletions(-)
----------------------------------------------------------------------