You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/12/14 15:56:29 UTC
[1/2] storm git commit: STORM-2990,
STORM-3279: Fix issue where Kafka Trident spout could ignore EARLIEST
and LATEST, and make EARLIEST and LATEST only take effect on topology deploy
Repository: storm
Updated Branches:
refs/heads/master 43859e9e8 -> e46a87a86
STORM-2990, STORM-3279: Fix issue where Kafka Trident spout could ignore EARLIEST and LATEST, and make EARLIEST and LATEST only take effect on topology deploy
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/642ae369
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/642ae369
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/642ae369
Branch: refs/heads/master
Commit: 642ae3698ce4cc6186b7a694a0f523f5a773fa2e
Parents: 6d871a7
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Nov 14 16:17:03 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Thu Nov 29 19:36:08 2018 +0100
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 8 +
.../trident/KafkaTridentSpoutBatchMetadata.java | 13 -
.../spout/trident/KafkaTridentSpoutEmitter.java | 43 ++-
.../java/org/apache/storm/kafka/KafkaUnit.java | 49 ++-
.../kafka/spout/KafkaSpoutAbstractTest.java | 1 -
.../spout/KafkaSpoutMessagingGuaranteeTest.java | 2 +-
.../KafkaTridentSpoutBatchMetadataTest.java | 17 -
.../KafkaTridentSpoutEmitterEmitTest.java | 306 ++++++++++++++++++
...afkaTridentSpoutEmitterPartitioningTest.java | 142 ++++++++
.../trident/KafkaTridentSpoutEmitterTest.java | 320 -------------------
pom.xml | 2 +-
11 files changed, 508 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index aa72396..b741c83 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -96,6 +96,10 @@
<artifactId>java-hamcrest</artifactId>
</dependency>
<dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<scope>test</scope>
@@ -133,6 +137,10 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
index f0d44b5..da37653 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -55,19 +55,6 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
this.topologyId = topologyId;
- }
-
- /**
- * Builds a metadata object from a non-empty set of records.
- *
- * @param consumerRecords The non-empty set of records.
- */
- public <K, V> KafkaTridentSpoutBatchMetadata(List<ConsumerRecord<K, V>> consumerRecords, String topologyId) {
- Validate.isTrue(!consumerRecords.isEmpty(), "There must be at least one record in order to build metadata");
-
- firstOffset = consumerRecords.get(0).offset();
- lastOffset = consumerRecords.get(consumerRecords.size() - 1).offset();
- this.topologyId = topologyId;
LOG.debug("Created {}", this.toString());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index ed86136..aee8987 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -121,6 +121,13 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
long seekOffset = currBatchMeta.getFirstOffset();
+ if (seekOffset < 0 && currBatchMeta.getFirstOffset() == currBatchMeta.getLastOffset()) {
+ LOG.debug("Skipping re-emit of batch with negative starting offset."
+ + " The spout may set a negative starting offset for an empty batch that occurs at the start of a partition."
+ + " It is not expected that Trident will replay such an empty batch,"
+ + " but this guard is here in case it tries to do so. See STORM-2990, STORM-3279 for context.");
+ return;
+ }
LOG.debug("Seeking to offset [{}] for topic partition [{}]", seekOffset, currBatchTp);
consumer.seek(currBatchTp, seekOffset);
@@ -170,15 +177,25 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
seek(currBatchTp, lastBatchMeta);
- final ConsumerRecords<K, V> records = consumer.poll(pollTimeoutMs);
- LOG.debug("Polled [{}] records from Kafka.", records.count());
+ final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp);
+ LOG.debug("Polled [{}] records from Kafka.", records.size());
if (!records.isEmpty()) {
for (ConsumerRecord<K, V> record : records) {
emitTuple(collector, record);
}
- // build new metadata
- currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp), this.topologyContext.getStormId());
+ // build new metadata based on emitted records
+ currentBatch = new KafkaTridentSpoutBatchMetadata(
+ records.get(0).offset(),
+ records.get(records.size() - 1).offset(),
+ topologyContext.getStormId());
+ } else {
+ //Build new metadata based on the consumer position.
+ //We want the next emit to start at the current consumer position,
+ //so make a meta that indicates that position - 1 is the last emitted offset
+ //This helps us avoid cases like STORM-3279, and simplifies the seek logic.
+ long lastEmittedOffset = consumer.position(currBatchTp) - 1;
+ currentBatch = new KafkaTridentSpoutBatchMetadata(lastEmittedOffset, lastEmittedOffset, topologyContext.getStormId());
}
} finally {
consumer.resume(pausedTopicPartitions);
@@ -187,7 +204,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
- return currentBatch == null ? null : currentBatch.toMap();
+ return currentBatch.toMap();
}
private boolean isFirstPollOffsetStrategyIgnoringCommittedOffsets() {
@@ -218,17 +235,18 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
* <ul>
* <li>This is the first batch for this partition</li>
* <li>This is a replay of the first batch for this partition</li>
- * <li>This is batch n for this partition, where batch 0...n-1 were all empty</li>
* </ul>
*
* @return the offset of the next fetch
*/
private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
- if (isFirstPoll(tp)) {
- if (firstPollOffsetStrategy == EARLIEST) {
+ if (isFirstPollSinceExecutorStarted(tp)) {
+ boolean isFirstPollSinceTopologyWasDeployed = lastBatchMeta == null
+ || !topologyContext.getStormId().equals(lastBatchMeta.getTopologyId());
+ if (firstPollOffsetStrategy == EARLIEST && isFirstPollSinceTopologyWasDeployed) {
LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
consumer.seekToBeginning(Collections.singleton(tp));
- } else if (firstPollOffsetStrategy == LATEST) {
+ } else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed) {
LOG.debug("First poll for topic partition [{}], seeking to partition end", tp);
consumer.seekToEnd(Collections.singleton(tp));
} else if (lastBatchMeta != null) {
@@ -247,9 +265,8 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
} else {
/*
- * Last batch meta is null, but this is not the first batch emitted for this partition by this emitter instance. This is either
- * a replay of the first batch for this partition, or all previous batches were empty, otherwise last batch meta could not be
- * null. Use the offset the consumer started at.
+ * Last batch meta is null, but this is not the first batch emitted for this partition by this emitter instance. This is
+ * a replay of the first batch for this partition. Use the offset the consumer started at.
*/
long initialFetchOffset = tpToFirstSeekOffset.get(tp);
consumer.seek(tp, initialFetchOffset);
@@ -262,7 +279,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable {
return fetchOffset;
}
- private boolean isFirstPoll(TopicPartition tp) {
+ private boolean isFirstPollSinceExecutorStarted(TopicPartition tp) {
return !tpToFirstSeekOffset.containsKey(tp);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
index 3f212cc..97d358d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -21,72 +21,69 @@ import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CON
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-import java.io.IOException;
import java.nio.file.Files;
+import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
import org.apache.storm.testing.TmpPath;
public class KafkaUnit {
+ private TestingServer zookeeper;
private KafkaServer kafkaServer;
- private EmbeddedZookeeper zkServer;
- private ZkUtils zkUtils;
private KafkaProducer<String, String> producer;
+ private AdminClient kafkaAdminClient;
private TmpPath kafkaDir;
- private static final String ZK_HOST = "127.0.0.1";
private static final String KAFKA_HOST = "127.0.0.1";
private static final int KAFKA_PORT = 9092;
public KafkaUnit() {
}
- public void setUp() throws IOException {
+ public void setUp() throws Exception {
// setup ZK
- zkServer = new EmbeddedZookeeper();
- String zkConnect = ZK_HOST + ":" + zkServer.port();
- ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
- zkUtils = ZkUtils.apply(zkClient, false);
+ zookeeper = new TestingServer(true);
// setup Broker
kafkaDir = new TmpPath(Files.createTempDirectory("kafka-").toAbsolutePath().toString());
Properties brokerProps = new Properties();
- brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("zookeeper.connect", zookeeper.getConnectString());
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", kafkaDir.getPath());
brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
+ brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
MockTime mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
// setup default Producer
createProducer();
+ kafkaAdminClient = AdminClient.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT));
}
- public void tearDown() {
+ public void tearDown() throws Exception {
+ kafkaAdminClient.close();
closeProducer();
kafkaServer.shutdown();
kafkaDir.close();
- zkUtils.close();
- zkServer.shutdown();
+ zookeeper.close();
}
- public void createTopic(String topicName) {
- AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ public void createTopic(String topicName) throws Exception {
+ kafkaAdminClient.createTopics(Collections.singleton(new NewTopic(topicName, 1, (short)1)))
+ .all()
+ .get(30, TimeUnit.SECONDS);
}
public int getKafkaPort() {
@@ -101,13 +98,7 @@ public class KafkaUnit {
producer = new KafkaProducer<>(producerProps);
}
- public void createProducer(Serializer keySerializer, Serializer valueSerializer) {
- Properties producerProps = new Properties();
- producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
- producer = new KafkaProducer<>(producerProps, keySerializer, valueSerializer);
- }
-
- public void sendMessage(ProducerRecord producerRecord) throws InterruptedException, ExecutionException, TimeoutException {
+ public void sendMessage(ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException, TimeoutException {
producer.send(producerRecord).get(10, TimeUnit.SECONDS);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
index b05f132..d828873 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -36,7 +36,6 @@ import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfigur
import org.apache.storm.kafka.spout.internal.ConsumerFactory;
import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
import org.apache.storm.kafka.spout.subscription.TopicAssigner;
-import org.apache.storm.kafka.spout.subscription.TopicAssigner;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 1d2c171..15d2295 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -212,7 +212,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
spout.nextTuple();
- verify(consumerMock, never()).commitSync(argThat(arg -> {
+ verify(consumerMock, never()).commitSync(argThat((Map<TopicPartition, OffsetAndMetadata> arg) -> {
return !arg.containsKey(partition);
}));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
index 0f419be..a03da8a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
@@ -19,11 +19,7 @@ package org.apache.storm.kafka.spout.trident;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-import java.util.List;
import java.util.Map;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
import org.json.simple.JSONValue;
import org.junit.Test;
@@ -51,17 +47,4 @@ public class KafkaTridentSpoutBatchMetadataTest {
assertThat(deserializedMetadata.getTopologyId(), is(metadata.getTopologyId()));
}
- @Test
- public void testCreateMetadataFromRecords() {
- long firstOffset = 15;
- long lastOffset = 55;
- String topologyId = "topologyId";
- List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(new TopicPartition("test", 0), firstOffset, (int) (lastOffset - firstOffset + 1));
-
- KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(records, topologyId);
- assertThat("The first offset should be the first offset in the record set", metadata.getFirstOffset(), is(firstOffset));
- assertThat("The last offset should be the last offset in the record set", metadata.getLastOffset(), is(lastOffset));
- assertThat(metadata.getTopologyId(), is(topologyId));
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
new file mode 100644
index 0000000..3692087
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaTridentSpoutEmitterEmitTest {
+
+ @Captor
+ public ArgumentCaptor<List<Object>> emitCaptor;
+
+ @Mock
+ public TopologyContext topologyContextMock;
+
+ @Mock
+ public TridentCollector collectorMock = mock(TridentCollector.class);
+
+ private final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
+ private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+ private final String topologyId = "topologyId";
+ private final long firstOffsetInKafka = 0;
+ private final int recordsInKafka = 100;
+ private final long lastOffsetInKafka = 99;
+
+ @BeforeEach
+ public void setUp() {
+ when(topologyContextMock.getStormId()).thenReturn(topologyId);
+ consumer.assign(Collections.singleton(partition));
+ consumer.updateBeginningOffsets(Collections.singletonMap(partition, firstOffsetInKafka));
+ consumer.updateEndOffsets(Collections.singletonMap(partition, firstOffsetInKafka + recordsInKafka));
+ List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstOffsetInKafka, recordsInKafka);
+ records.forEach(record -> consumer.addRecord(record));
+ }
+
+ private KafkaTridentSpoutEmitter<String, String> createEmitter(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+ return new KafkaTridentSpoutEmitter<>(
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+ .setRecordTranslator(r -> new Values(r.offset()), new Fields("offset"))
+ .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
+ .setPollTimeoutMs(1)
+ .build(),
+ topologyContextMock,
+ config -> consumer, new TopicAssigner());
+ }
+
+ private Map<String, Object> doEmitNewBatchTest(FirstPollOffsetStrategy firstPollOffsetStrategy, TridentCollector collectorMock, TopicPartition tp, Map<String, Object> previousBatchMeta) {
+ KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);
+
+ TransactionAttempt txid = new TransactionAttempt(10L, 0);
+ KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
+ return emitter.emitPartitionBatchNew(txid, collectorMock, kttp, previousBatchMeta);
+ }
+
+ @Test
+ public void testEmitNewBatchWithNullMetaUncommittedEarliest() {
+ //Check that null meta makes the spout seek to EARLIEST, and that the returned meta is correct
+ Map<String, Object> batchMeta = doEmitNewBatchTest(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, collectorMock, partition, null);
+
+ verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
+ List<List<Object>> emits = emitCaptor.getAllValues();
+ assertThat(emits.get(0).get(0), is(firstOffsetInKafka));
+ assertThat(emits.get(emits.size() - 1).get(0), is(lastOffsetInKafka));
+ KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+ assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffsetInKafka));
+ assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+ }
+
+ @Test
+ public void testEmitNewBatchWithNullMetaUncommittedLatest() {
+ //Check that null meta makes the spout seek to LATEST, and that the returned meta is correct
+ Map<String, Object> batchMeta = doEmitNewBatchTest(FirstPollOffsetStrategy.UNCOMMITTED_LATEST, collectorMock, partition, null);
+
+ verify(collectorMock, never()).emit(anyList());
+ KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+ assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(lastOffsetInKafka));
+ assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST"})
+ public void testEmitNewBatchWithPreviousMeta(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+ //Check that non-null meta makes the spout seek according to the provided metadata, and that the returned meta is correct
+ long firstExpectedEmittedOffset = 50;
+ int expectedEmittedRecords = 50;
+ KafkaTridentSpoutBatchMetadata previousBatchMeta = new KafkaTridentSpoutBatchMetadata(firstOffsetInKafka, firstExpectedEmittedOffset - 1, topologyId);
+ Map<String, Object> batchMeta = doEmitNewBatchTest(firstPollOffsetStrategy, collectorMock, partition, previousBatchMeta.toMap());
+
+ verify(collectorMock, times(expectedEmittedRecords)).emit(emitCaptor.capture());
+ List<List<Object>> emits = emitCaptor.getAllValues();
+ assertThat(emits.get(0).get(0), is(firstExpectedEmittedOffset));
+ assertThat(emits.get(emits.size() - 1).get(0), is(lastOffsetInKafka));
+ KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+ assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstExpectedEmittedOffset));
+ assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+ }
+
+ @Test
+ public void testEmitEmptyBatches() throws Exception {
+ //Check that the emitter can handle emitting empty batches on a new partition.
+ //If the spout is configured to seek to LATEST, or the partition is empty, the initial batches may be empty
+ KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
+ KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+ Map<String, Object> lastBatchMeta = null;
+ //Emit 10 empty batches, simulating no new records being present in Kafka
+ for (int i = 0; i < 10; i++) {
+ TransactionAttempt txid = new TransactionAttempt((long) i, 0);
+ lastBatchMeta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, lastBatchMeta);
+ KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
+ assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getFirstOffset(), is(lastOffsetInKafka));
+ assertThat("Since the first poll strategy is LATEST, the meta should indicate that the last message has already been emitted", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+ }
+ //Add new records to Kafka, and check that the next batch contains these records
+ long firstNewRecordOffset = lastOffsetInKafka + 1;
+ int numNewRecords = 10;
+ List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
+ newRecords.forEach(consumer::addRecord);
+ lastBatchMeta = emitter.emitPartitionBatchNew(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta);
+
+ verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
+ List<List<Object>> emits = emitCaptor.getAllValues();
+ assertThat(emits.get(0).get(0), is(firstNewRecordOffset));
+ assertThat(emits.get(emits.size() - 1).get(0), is(firstNewRecordOffset + numNewRecords - 1));
+ KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
+ assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstNewRecordOffset));
+ assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstNewRecordOffset + numNewRecords - 1));
+ }
+
+ @Test
+ public void testReEmitBatch() {
+ //Check that a reemit emits exactly the same tuples as the last batch, even if Kafka returns more messages
+ long firstEmittedOffset = 50;
+ int numEmittedRecords = 10;
+ KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstEmittedOffset, firstEmittedOffset + numEmittedRecords - 1, topologyId);
+ KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+ TransactionAttempt txid = new TransactionAttempt(10L, 0);
+ KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+ emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap());
+
+ verify(collectorMock, times(numEmittedRecords)).emit(emitCaptor.capture());
+ List<List<Object>> emits = emitCaptor.getAllValues();
+ assertThat(emits.get(0).get(0), is(firstEmittedOffset));
+ assertThat(emits.get(emits.size() - 1).get(0), is(firstEmittedOffset + numEmittedRecords - 1));
+ }
+
+ @Test
+ public void testReEmitBatchForOldTopologyWhenIgnoringCommittedOffsets() {
+ //In some cases users will want to drop retrying old batches, e.g. if the topology should start over from scratch.
+ //If the FirstPollOffsetStrategy ignores committed offsets, we should not retry batches for old topologies
+ //The batch retry should be skipped entirely
+ KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstOffsetInKafka, lastOffsetInKafka, "a new storm id");
+ KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.EARLIEST);
+ TransactionAttempt txid = new TransactionAttempt(10L, 0);
+ KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+ emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap());
+
+ verify(collectorMock, never()).emit(anyList());
+ }
+
+ @Test
+ public void testEmitEmptyFirstBatch() {
+ /**
+ * Check that when the first batch after a redeploy is empty, the emitter does not restart at the pre-redeploy offset. STORM-3279.
+ */
+ long firstEmittedOffset = 50;
+ int emittedRecords = 10;
+ KafkaTridentSpoutBatchMetadata preRedeployLastMeta = new KafkaTridentSpoutBatchMetadata(firstEmittedOffset, firstEmittedOffset + emittedRecords - 1, "an old topology");
+ KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
+ TransactionAttempt txid = new TransactionAttempt(0L, 0);
+ KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+ Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preRedeployLastMeta.toMap());
+
+ verify(collectorMock, never()).emit(anyList());
+ KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
+ assertThat(deserializedMeta.getFirstOffset(), is(lastOffsetInKafka));
+ assertThat(deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+
+ long firstNewRecordOffset = lastOffsetInKafka + 1;
+ int numNewRecords = 10;
+ List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition, firstNewRecordOffset, numNewRecords);
+ newRecords.forEach(consumer::addRecord);
+ meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, meta);
+
+ verify(collectorMock, times(numNewRecords)).emit(emitCaptor.capture());
+ List<List<Object>> emits = emitCaptor.getAllValues();
+ assertThat(emits.get(0).get(0), is(firstNewRecordOffset));
+ assertThat(emits.get(emits.size() - 1).get(0), is(firstNewRecordOffset + numNewRecords - 1));
+ deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
+ assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstNewRecordOffset));
+ assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstNewRecordOffset + numNewRecords - 1));
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST"})
+ public void testUnconditionalStrategyWhenSpoutWorkerIsRestarted(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+ /**
+ * EARLIEST/LATEST should act like UNCOMMITTED_EARLIEST/LATEST if the emitter is new but the topology has not restarted (storm id
+ * has not changed)
+ */
+ long preRestartEmittedOffset = 20;
+ int lastBatchEmittedRecords = 10;
+ int preRestartEmittedRecords = 30;
+ KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + lastBatchEmittedRecords - 1, topologyId);
+ KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(firstPollOffsetStrategy);
+ TransactionAttempt txid = new TransactionAttempt(0L, 0);
+ KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+ Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
+
+ long firstEmittedOffset = preRestartEmittedOffset + lastBatchEmittedRecords;
+ int emittedRecords = recordsInKafka - preRestartEmittedRecords;
+ verify(collectorMock, times(emittedRecords)).emit(emitCaptor.capture());
+ List<List<Object>> emits = emitCaptor.getAllValues();
+ assertThat(emits.get(0).get(0), is(firstEmittedOffset));
+ assertThat(emits.get(emits.size() - 1).get(0), is(lastOffsetInKafka));
+ KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
+ assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstEmittedOffset));
+ assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+ }
+
+ @Test
+ public void testEarliestStrategyWhenTopologyIsRedeployed() {
+ /**
+ * EARLIEST should be applied if the emitter is new and the topology has been redeployed (storm id has changed)
+ */
+ long preRestartEmittedOffset = 20;
+ int preRestartEmittedRecords = 10;
+ KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology");
+ KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.EARLIEST);
+ TransactionAttempt txid = new TransactionAttempt(0L, 0);
+ KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+ Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
+
+ verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
+ List<List<Object>> emits = emitCaptor.getAllValues();
+ assertThat(emits.get(0).get(0), is(firstOffsetInKafka));
+ assertThat(emits.get(emits.size() - 1).get(0), is(lastOffsetInKafka));
+ KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
+ assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffsetInKafka));
+ assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(lastOffsetInKafka));
+ }
+
+ @Test
+ public void testLatestStrategyWhenTopologyIsRedeployed() {
+ /**
+ * EARLIEST should be applied if the emitter is new and the topology has been redeployed (storm id has changed)
+ */
+ long preRestartEmittedOffset = 20;
+ int preRestartEmittedRecords = 10;
+ KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset, preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology");
+ KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(FirstPollOffsetStrategy.LATEST);
+ TransactionAttempt txid = new TransactionAttempt(0L, 0);
+ KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+ Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, preExecutorRestartLastMeta.toMap());
+
+ verify(collectorMock, never()).emit(anyList());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterPartitioningTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterPartitioningTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterPartitioningTest.java
new file mode 100644
index 0000000..7eb107d
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterPartitioningTest.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaTridentSpoutEmitterPartitioningTest {
+
+ @Mock
+ public TopologyContext topologyContextMock;
+
+ private final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
+ private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+
+ @Test
+ public void testGetOrderedPartitionsIsConsistent() {
+ KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+ .build(),
+ topologyContextMock,
+ config -> consumer, new TopicAssigner());
+
+ Set<TopicPartition> allPartitions = new HashSet<>();
+ int numPartitions = 10;
+ for (int i = 0; i < numPartitions; i++) {
+ allPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+ }
+ List<Map<String, Object>> serializedPartitions = allPartitions.stream()
+ .map(tp -> tpSerializer.toMap(tp))
+ .collect(Collectors.toList());
+
+ List<KafkaTridentSpoutTopicPartition> orderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
+ assertThat("Should contain all partitions", orderedPartitions.size(), is(allPartitions.size()));
+ Collections.shuffle(serializedPartitions);
+ List<KafkaTridentSpoutTopicPartition> secondGetOrderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
+ assertThat("Ordering must be consistent", secondGetOrderedPartitions, is(orderedPartitions));
+
+ serializedPartitions.add(tpSerializer.toMap(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, numPartitions)));
+ List<KafkaTridentSpoutTopicPartition> orderedPartitionsWithNewPartition = emitter.getOrderedPartitions(serializedPartitions);
+ orderedPartitionsWithNewPartition.remove(orderedPartitionsWithNewPartition.size() - 1);
+ assertThat("Adding new partitions should not shuffle the existing ordering", orderedPartitionsWithNewPartition, is(orderedPartitions));
+ }
+
+ @Test
+ public void testGetPartitionsForTask() {
+ //Verify correct wrapping/unwrapping of partition and delegation of partition assignment
+ ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
+ when(partitionerMock.getPartitionsForThisTask(any(), any()))
+ .thenAnswer(invocation -> {
+ List<TopicPartition> partitions = new ArrayList<>(invocation.getArgument(0));
+ partitions.remove(0);
+ return new HashSet<>(partitions);
+ });
+
+ KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1)
+ .build(),
+ topologyContextMock,
+ config -> consumer, new TopicAssigner());
+
+ List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+ }
+ List<TopicPartition> unwrappedPartitions = allPartitions.stream()
+ .map(kttp -> kttp.getTopicPartition())
+ .collect(Collectors.toList());
+
+ List<KafkaTridentSpoutTopicPartition> partitionsForTask = emitter.getPartitionsForTask(0, 2, allPartitions);
+ verify(partitionerMock).getPartitionsForThisTask(eq(unwrappedPartitions), any(TopologyContext.class));
+ allPartitions.remove(0);
+ assertThat("Should have assigned all except the first partition to this task", new HashSet<>(partitionsForTask), is(new HashSet<>(allPartitions)));
+ }
+
+ @Test
+ public void testAssignPartitions() {
+ //Verify correct unwrapping of partitions and delegation of assignment
+ TopicAssigner assignerMock = mock(TopicAssigner.class);
+
+ KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+ SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+ .build(),
+ topologyContextMock,
+ config -> consumer, assignerMock);
+
+ List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+ }
+ Set<TopicPartition> unwrappedPartitions = allPartitions.stream()
+ .map(kttp -> kttp.getTopicPartition())
+ .collect(Collectors.toSet());
+
+ emitter.refreshPartitions(allPartitions);
+
+ verify(assignerMock).assignPartitions(eq(consumer), eq(unwrappedPartitions), any(ConsumerRebalanceListener.class));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
deleted file mode 100644
index 059c8c7..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.trident;
-
-import static org.hamcrest.CoreMatchers.nullValue;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
-import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
-import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.ConsumerFactory;
-import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
-import org.apache.storm.kafka.spout.subscription.TopicAssigner;
-import org.apache.storm.kafka.spout.subscription.TopicFilter;
-import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.topology.TransactionAttempt;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-
-public class KafkaTridentSpoutEmitterTest {
-
- @Rule
- public MockitoRule mockito = MockitoJUnit.rule();
-
- @Captor
- public ArgumentCaptor<List<Object>> emitCaptor;
-
- @Mock
- public TopologyContext topologyContextMock;
-
- private final MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
- private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
- private final String topologyId = "topologyId";
-
- @Before
- public void setUp() {
- when(topologyContextMock.getStormId()).thenReturn(topologyId);
- }
-
- @Test
- public void testGetOrderedPartitionsIsConsistent() {
- KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
- .build(),
- topologyContextMock,
- config -> consumer, new TopicAssigner());
-
- Set<TopicPartition> allPartitions = new HashSet<>();
- int numPartitions = 10;
- for (int i = 0; i < numPartitions; i++) {
- allPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
- }
- List<Map<String, Object>> serializedPartitions = allPartitions.stream()
- .map(tp -> tpSerializer.toMap(tp))
- .collect(Collectors.toList());
-
- List<KafkaTridentSpoutTopicPartition> orderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
- assertThat("Should contain all partitions", orderedPartitions.size(), is(allPartitions.size()));
- Collections.shuffle(serializedPartitions);
- List<KafkaTridentSpoutTopicPartition> secondGetOrderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
- assertThat("Ordering must be consistent", secondGetOrderedPartitions, is(orderedPartitions));
-
- serializedPartitions.add(tpSerializer.toMap(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, numPartitions)));
- List<KafkaTridentSpoutTopicPartition> orderedPartitionsWithNewPartition = emitter.getOrderedPartitions(serializedPartitions);
- orderedPartitionsWithNewPartition.remove(orderedPartitionsWithNewPartition.size() - 1);
- assertThat("Adding new partitions should not shuffle the existing ordering", orderedPartitionsWithNewPartition, is(orderedPartitions));
- }
-
- @Test
- public void testGetPartitionsForTask() {
- //Verify correct wrapping/unwrapping of partition and delegation of partition assignment
- ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
- when(partitionerMock.getPartitionsForThisTask(any(), any()))
- .thenAnswer(invocation -> {
- List<TopicPartition> partitions = new ArrayList<>((List<TopicPartition>) invocation.getArguments()[0]);
- partitions.remove(0);
- return new HashSet<>(partitions);
- });
-
- KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1)
- .build(),
- topologyContextMock,
- config -> consumer, new TopicAssigner());
-
- List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
- }
- List<TopicPartition> unwrappedPartitions = allPartitions.stream()
- .map(kttp -> kttp.getTopicPartition())
- .collect(Collectors.toList());
-
- List<KafkaTridentSpoutTopicPartition> partitionsForTask = emitter.getPartitionsForTask(0, 2, allPartitions);
- verify(partitionerMock).getPartitionsForThisTask(eq(unwrappedPartitions), any(TopologyContext.class));
- allPartitions.remove(0);
- assertThat("Should have assigned all except the first partition to this task", new HashSet<>(partitionsForTask), is(new HashSet<>(allPartitions)));
- }
-
- @Test
- public void testAssignPartitions() {
- //Verify correct unwrapping of partitions and delegation of assignment
- TopicAssigner assignerMock = mock(TopicAssigner.class);
-
- KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
- .build(),
- topologyContextMock,
- config -> consumer, assignerMock);
-
- List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
- }
- Set<TopicPartition> unwrappedPartitions = allPartitions.stream()
- .map(kttp -> kttp.getTopicPartition())
- .collect(Collectors.toSet());
-
- emitter.refreshPartitions(allPartitions);
-
- verify(assignerMock).assignPartitions(eq(consumer), eq(unwrappedPartitions), any(ConsumerRebalanceListener.class));
- }
-
- private KafkaTridentSpoutEmitter<String, String> createEmitterWithMessages(TopicPartition tp, long firstOffset, int numRecords, FirstPollOffsetStrategy firstPollOffsetStrategy) {
- consumer.assign(Collections.singleton(tp));
- //Pretend that the topic offsets start at 0, even if the batch should start with a later offset
- consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
- List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords);
- records.forEach(record -> consumer.addRecord(record));
- return new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
- .setRecordTranslator(r -> new Values(r.offset()), new Fields("offset"))
- .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
- .build(),
- topologyContextMock,
- config -> consumer, new TopicAssigner());
- }
-
- private Map<String, Object> doEmitNewBatchTest(MockConsumer<String, String> consumer, TridentCollector collectorMock, TopicPartition tp, long firstOffset, int numRecords, Map<String, Object> previousBatchMeta) {
- KafkaTridentSpoutEmitter<String, String> emitter = createEmitterWithMessages(tp, firstOffset, numRecords, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
-
- TransactionAttempt txid = new TransactionAttempt(10L, 0);
- KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
- return emitter.emitPartitionBatchNew(txid, collectorMock, kttp, previousBatchMeta);
- }
-
- @Test
- public void testEmitNewBatchWithNullMeta() {
- //Check that null meta makes the spout seek according to FirstPollOffsetStrategy, and that the returned meta is correct
- TridentCollector collectorMock = mock(TridentCollector.class);
- TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
- long firstOffset = 0;
- int numRecords = 10;
- Map<String, Object> batchMeta = doEmitNewBatchTest(consumer, collectorMock, tp, firstOffset, numRecords, null);
-
- verify(collectorMock, times(numRecords)).emit(emitCaptor.capture());
- List<List<Object>> emits = emitCaptor.getAllValues();
- assertThat(emits.get(0).get(0), is(firstOffset));
- assertThat(emits.get(emits.size() - 1).get(0), is(firstOffset + numRecords - 1));
- KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
- assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
- assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
- }
-
- @Test
- public void testEmitNewBatchWithPreviousMeta() {
- //Check that non-null meta makes the spout seek according to the provided metadata, and that the returned meta is correct
- TridentCollector collectorMock = mock(TridentCollector.class);
- TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
- long firstOffset = 50;
- int numRecords = 10;
- KafkaTridentSpoutBatchMetadata previousBatchMeta = new KafkaTridentSpoutBatchMetadata(0, firstOffset - 1, topologyId);
- Map<String, Object> batchMeta = doEmitNewBatchTest(consumer, collectorMock, tp, firstOffset, numRecords, previousBatchMeta.toMap());
-
- verify(collectorMock, times(numRecords)).emit(emitCaptor.capture());
- List<List<Object>> emits = emitCaptor.getAllValues();
- assertThat(emits.get(0).get(0), is(firstOffset));
- assertThat(emits.get(emits.size() - 1).get(0), is(firstOffset + numRecords - 1));
- KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
- assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
- assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
- }
-
- @Test
- public void testEmitEmptyBatches() throws Exception {
- //Check that the emitter can handle emitting empty batches on a new partition.
- //If the spout is configured to seek to LATEST, or the partition is empty, the initial batches may be empty
- KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
- TridentCollector collectorMock = mock(TridentCollector.class);
- TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
- when(consumerMock.assignment()).thenReturn(Collections.singleton(tp));
- ConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
- KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
- SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
- .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
- .build(),
- mock(TopologyContext.class),
- consumerFactory, new TopicAssigner());
- KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
- Map<String, Object> lastBatchMeta = null;
- //Emit 10 empty batches, simulating no new records being present in Kafka
- for(int i = 0; i < 10; i++) {
- clearInvocations(consumerMock);
- when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
- TransactionAttempt txid = new TransactionAttempt((long) i, 0);
- lastBatchMeta = emitter.emitPartitionBatchNew(txid, collectorMock, kttp, lastBatchMeta);
- assertThat(lastBatchMeta, nullValue());
- if (i == 0) {
- InOrder inOrder = inOrder(consumerMock, collectorMock);
- inOrder.verify(consumerMock).seekToEnd(Collections.singleton(tp));
- inOrder.verify(consumerMock).poll(anyLong());
- } else {
- verify(consumerMock).poll(anyLong());
- }
- }
- clearInvocations(consumerMock);
- //Simulate that new records were added in Kafka, and check that the next batch contains these records
- long firstOffset = 0;
- int numRecords = 10;
- when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(
- tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords))));
- lastBatchMeta = emitter.emitPartitionBatchNew(new TransactionAttempt(11L, 0), collectorMock, kttp, lastBatchMeta);
-
- verify(consumerMock).poll(anyLong());
- verify(collectorMock, times(numRecords)).emit(anyList());
- KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(lastBatchMeta);
- assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
- assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
- }
-
- @Test
- public void testReEmitBatch() {
- TridentCollector collectorMock = mock(TridentCollector.class);
- TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
- long firstOffset = 50;
- int numRecordsEmitted = 10;
- //Make sure the consumer can return extra records, so we test that re-emit only emits the original messages
- int numRecordsPresent = 100;
- KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstOffset, firstOffset + numRecordsEmitted - 1, topologyId);
- KafkaTridentSpoutEmitter<String, String> emitter = createEmitterWithMessages(tp, firstOffset, numRecordsPresent, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
- TransactionAttempt txid = new TransactionAttempt(10L, 0);
- KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
- emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap());
-
- verify(collectorMock, times(numRecordsEmitted)).emit(emitCaptor.capture());
- List<List<Object>> emits = emitCaptor.getAllValues();
- assertThat(emits.get(0).get(0), is(firstOffset));
- assertThat(emits.get(emits.size() - 1).get(0), is(firstOffset + numRecordsEmitted - 1));
- }
-
- @Test
- public void testReEmitBatchForOldTopologyWhenIgnoringCommittedOffsets() {
- //In some cases users will want to drop retrying old batches, e.g. if the topology should start over from scratch.
- //If the FirstPollOffsetStrategy ignores committed offsets, we should not retry batches for old topologies
- //The batch retry should be skipped entirely
- TridentCollector collectorMock = mock(TridentCollector.class);
- TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
- long firstOffset = 50;
- int numRecordsEmitted = 10;
- KafkaTridentSpoutBatchMetadata batchMeta = new KafkaTridentSpoutBatchMetadata(firstOffset, firstOffset + numRecordsEmitted - 1, "a new storm id");
- KafkaTridentSpoutEmitter<String, String> emitter = createEmitterWithMessages(tp, firstOffset, numRecordsEmitted, FirstPollOffsetStrategy.EARLIEST);
- TransactionAttempt txid = new TransactionAttempt(10L, 0);
- KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
- emitter.reEmitPartitionBatch(txid, collectorMock, kttp, batchMeta.toMap());
-
- verify(collectorMock, never()).emit(anyList());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/642ae369/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9836fab..2bebb01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -303,7 +303,7 @@
<jackson.version>2.9.4</jackson.version>
- <storm.kafka.client.version>0.10.1.0</storm.kafka.client.version>
+ <storm.kafka.client.version>0.11.0.0</storm.kafka.client.version>
<!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
<java.unit.test.exclude>org.apache.storm.testing.IntegrationTest, org.apache.storm.testing.PerformanceTest</java.unit.test.exclude>
[2/2] storm git commit: Merge branch 'STORM-2990' of
https://github.com/srdo/storm into asfgit-master
Posted by sr...@apache.org.
Merge branch 'STORM-2990' of https://github.com/srdo/storm into asfgit-master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e46a87a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e46a87a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e46a87a8
Branch: refs/heads/master
Commit: e46a87a862f4d9fc76358f76f9fb4aad5264b080
Parents: 43859e9 642ae36
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Dec 14 16:55:35 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Fri Dec 14 16:55:35 2018 +0100
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 8 +
.../trident/KafkaTridentSpoutBatchMetadata.java | 13 -
.../spout/trident/KafkaTridentSpoutEmitter.java | 43 ++-
.../java/org/apache/storm/kafka/KafkaUnit.java | 49 ++-
.../kafka/spout/KafkaSpoutAbstractTest.java | 1 -
.../spout/KafkaSpoutMessagingGuaranteeTest.java | 2 +-
.../KafkaTridentSpoutBatchMetadataTest.java | 17 -
.../KafkaTridentSpoutEmitterEmitTest.java | 306 ++++++++++++++++++
...afkaTridentSpoutEmitterPartitioningTest.java | 142 ++++++++
.../trident/KafkaTridentSpoutEmitterTest.java | 320 -------------------
pom.xml | 2 +-
11 files changed, 508 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e46a87a8/pom.xml
----------------------------------------------------------------------