You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/09 08:57:58 UTC
[1/2] storm git commit: STORM-3046: Ensure KafkaTridentSpoutEmitter
handles empty batches correctly when they occur at the beginning of the
stream
Repository: storm
Updated Branches:
refs/heads/master 6872adfc2 -> afe35f318
STORM-3046: Ensure KafkaTridentSpoutEmitter handles empty batches correctly when they occur at the beginning of the stream
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0154216c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0154216c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0154216c
Branch: refs/heads/master
Commit: 0154216c9cb489b92ab86393fa1dd2fa5de56073
Parents: fdab9f9
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Apr 28 13:01:44 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Apr 28 13:01:44 2018 +0200
----------------------------------------------------------------------
.../spout/trident/KafkaTridentSpoutEmitter.java | 42 ++++++++++-------
.../trident/KafkaTridentSpoutEmitterTest.java | 49 ++++++++++++++++++++
2 files changed, 75 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0154216c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 86535be..27e75c2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -65,8 +65,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
private final TopicAssigner topicAssigner;
- // set of topic-partitions for which first poll has already occurred, and the first polled txid
- private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>();
+ // The first seek offset for each topic partition, i.e. the offset this spout instance started processing at.
+ private final Map<TopicPartition, Long> tpToFirstSeekOffset = new HashMap<>();
private final long pollTimeoutMs;
private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
@@ -117,7 +117,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
// pause other topic-partitions to only poll from current topic-partition
pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
- seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
+ seek(currBatchTp, lastBatchMeta);
final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
LOG.debug("Polled [{}] records from Kafka.", records.count());
@@ -149,17 +149,18 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
/**
* Determines the offset of the next fetch. Will use the firstPollOffsetStrategy if this is the first poll for the topic partition.
* Otherwise the next offset will be one past the last batch, based on lastBatchMeta.
- *
- * <p>lastBatchMeta should only be null when the previous txid was not emitted (e.g. new topic),
- * it is the first poll for the spout instance, or it is a replay of the first txid this spout emitted on this partition.
- * In the second case, there are either no previous transactions, or the MBC is still committing them
- * and they will fail because this spout did not emit the corresponding batches. If it had emitted them, the meta could not be null.
- * In any case, the lastBatchMeta should never be null if this is not the first poll for this spout instance.
+ *
+ * <p>lastBatchMeta should only be null in the following cases:
+ * <ul>
+ * <li>This is the first batch for this partition</li>
+ * <li>This is a replay of the first batch for this partition</li>
+ * <li>This is batch n for this partition, where batch 0...n-1 were all empty</li>
+ * </ul>
*
* @return the offset of the next fetch
*/
- private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) {
- if (isFirstPoll(tp, transactionId)) {
+ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
+ if (isFirstPoll(tp)) {
if (firstPollOffsetStrategy == EARLIEST) {
LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
kafkaConsumer.seekToBeginning(Collections.singleton(tp));
@@ -176,10 +177,20 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp);
kafkaConsumer.seekToEnd(Collections.singleton(tp));
}
- firstPollTransaction.put(tp, transactionId);
- } else {
+ tpToFirstSeekOffset.put(tp, kafkaConsumer.position(tp));
+ } else if (lastBatchMeta != null) {
kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
+ } else {
+ /*
+ * Last batch meta is null, but this is not the first batch emitted for this partition by this emitter instance.
+ * This is either a replay of the first batch for this partition, or all previous batches were empty,
+ * otherwise last batch meta could not be null. Use the offset the consumer started at.
+ */
+ long initialFetchOffset = tpToFirstSeekOffset.get(tp);
+ kafkaConsumer.seek(tp, initialFetchOffset);
+ LOG.debug("First poll for topic partition [{}], no last batch metadata present."
+ + " Using stored initial fetch offset [{}]", tp, initialFetchOffset);
}
final long fetchOffset = kafkaConsumer.position(tp);
@@ -187,9 +198,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
return fetchOffset;
}
- private boolean isFirstPoll(TopicPartition tp, long txid) {
- // The first poll is either the "real" first transaction, or a replay of the first transaction
- return !firstPollTransaction.containsKey(tp) || firstPollTransaction.get(tp) == txid;
+ private boolean isFirstPoll(TopicPartition tp) {
+ return !tpToFirstSeekOffset.containsKey(tp);
}
// returns paused topic-partitions.
http://git-wip-us.apache.org/repos/asf/storm/blob/0154216c/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
index 6208ce4..3be0488 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
@@ -16,12 +16,14 @@
package org.apache.storm.kafka.spout.trident;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -211,4 +213,51 @@ public class KafkaTridentSpoutEmitterTest {
assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
}
+ @Test
+ public void testEmitEmptyBatches() throws Exception {
+ //Check that the emitter can handle emitting empty batches on a new partition.
+ //If the spout is configured to seek to LATEST, or the partition is empty, the initial batches may be empty
+ KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+ TridentCollector collectorMock = mock(TridentCollector.class);
+ TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+ when(consumerMock.assignment()).thenReturn(Collections.singleton(tp));
+ KafkaConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
+ KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+ SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+ .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
+ .build(),
+ mock(TopologyContext.class),
+ consumerFactory, new TopicAssigner());
+ KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
+ Map<String, Object> lastBatchMeta = null;
+ //Emit 10 empty batches, simulating no new records being present in Kafka
+ for(int i = 0; i < 10; i++) {
+ clearInvocations(consumerMock);
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+ TransactionAttempt txid = new TransactionAttempt((long) i, 0);
+ lastBatchMeta = emitter.emitPartitionBatch(txid, collectorMock, kttp, lastBatchMeta);
+ assertThat(lastBatchMeta, nullValue());
+ if (i == 0) {
+ InOrder inOrder = inOrder(consumerMock, collectorMock);
+ inOrder.verify(consumerMock).seekToEnd(Collections.singleton(tp));
+ inOrder.verify(consumerMock).poll(anyLong());
+ } else {
+ verify(consumerMock).poll(anyLong());
+ }
+ }
+ clearInvocations(consumerMock);
+ //Simulate that new records were added in Kafka, and check that the next batch contains these records
+ long firstOffset = 0;
+ int numRecords = 10;
+ when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(
+ tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords))));
+ lastBatchMeta = emitter.emitPartitionBatch(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta);
+
+ verify(consumerMock).poll(anyLong());
+ verify(collectorMock, times(numRecords)).emit(anyList());
+ KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
+ assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
+ assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
+ }
+
}
[2/2] storm git commit: Merge branch 'STORM-3046' of
https://github.com/srdo/storm into STORM-3046-merge
Posted by ka...@apache.org.
Merge branch 'STORM-3046' of https://github.com/srdo/storm into STORM-3046-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/afe35f31
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/afe35f31
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/afe35f31
Branch: refs/heads/master
Commit: afe35f318489c86eca26064e3133e89b5591bf1f
Parents: 6872adf 0154216
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Jul 9 17:57:48 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Jul 9 17:57:48 2018 +0900
----------------------------------------------------------------------
.../spout/trident/KafkaTridentSpoutEmitter.java | 42 ++++++++++-------
.../trident/KafkaTridentSpoutEmitterTest.java | 49 ++++++++++++++++++++
2 files changed, 75 insertions(+), 16 deletions(-)
----------------------------------------------------------------------