You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/07/19 18:25:04 UTC
[1/4] storm git commit: STORM-2542: Remove storm-kafka-client
KafkaConsumer.subscribe API option
Repository: storm
Updated Branches:
refs/heads/master 10d381b30 -> 3580dbc80
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 2d55520..23630a6 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -19,9 +19,10 @@ import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfigu
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -42,6 +43,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.subscription.Subscription;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
@@ -74,14 +76,11 @@ public class KafkaSpoutRebalanceTest {
}
//Returns messageIds in order of emission
- private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
- //Setup spout with mock consumer so we can get at the rebalance listener
+ private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) {
+ //Setup spout with mock consumer so we can get at the rebalance listener
spout.open(conf, contextMock, collectorMock);
spout.activate();
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
//Assign partitions to the spout
ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -95,9 +94,9 @@ public class KafkaSpoutRebalanceTest {
Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(firstPartitionRecords))
- .thenReturn(new ConsumerRecords(secondPartitionRecords))
- .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+ .thenReturn(new ConsumerRecords<>(firstPartitionRecords))
+ .thenReturn(new ConsumerRecords<>(secondPartitionRecords))
+ .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
//Emit the messages
spout.nextTuple();
@@ -122,7 +121,12 @@ public class KafkaSpoutRebalanceTest {
public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ Subscription subscriptionMock = mock(Subscription.class);
+ doNothing()
+ .when(subscriptionMock)
+ .subscribe(any(), rebalanceListenerCapture.capture(), any());
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.build(), consumerFactory);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -130,7 +134,8 @@ public class KafkaSpoutRebalanceTest {
TopicPartition assignedPartition = new TopicPartition(topic, 2);
//Emit a message on each partition and revoke the first partition
- List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+ spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
//Ack both emitted tuples
spout.ack(emittedMessageIds.get(0));
@@ -152,8 +157,13 @@ public class KafkaSpoutRebalanceTest {
@Test
public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ Subscription subscriptionMock = mock(Subscription.class);
+ doNothing()
+ .when(subscriptionMock)
+ .subscribe(any(), rebalanceListenerCapture.capture(), any());
KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
.setOffsetCommitPeriodMs(10)
.setRetry(retryServiceMock)
.build(), consumerFactory);
@@ -166,7 +176,8 @@ public class KafkaSpoutRebalanceTest {
.thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
//Emit a message on each partition and revoke the first partition
- List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+ spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
//Check that only two message ids were generated
verify(retryServiceMock, times(2)).getMessageId(anyObject());
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index d84f4da..078f7a1 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -30,80 +30,71 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
public class KafkaSpoutRetryLimitTest {
-
+
private final long offsetCommitPeriodMs = 2_000;
private final TopologyContext contextMock = mock(TopologyContext.class);
private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
private final Map<String, Object> conf = new HashMap<>();
private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
private KafkaConsumer<String, String> consumerMock;
- private KafkaSpout<String, String> spout;
- private KafkaSpoutConfig spoutConfig;
-
+ private KafkaSpoutConfig<String, String> spoutConfig;
+
public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
- new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
- 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
- private void setupSpoutWithNoRetry(Set<TopicPartition> assignedPartitions) {
+ new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+ 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
spoutConfig = getKafkaSpoutConfigBuilder(-1)
- .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
- .setRetry(ZERO_RETRIES_RETRY_SERVICE)
- .build();
-
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .setRetry(ZERO_RETRIES_RETRY_SERVICE)
+ .build();
consumerMock = mock(KafkaConsumer.class);
- KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-
- spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
- spout.open(conf, contextMock, collectorMock);
- spout.activate();
-
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
- //Assign partitions to the spout
- ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
- consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
}
-
+
@Test
public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
//Spout should ack failed messages after they hit the retry limit
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpoutWithNoRetry(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
int lastOffset = 3;
for (int i = 0; i <= lastOffset; i++) {
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
records.put(partition, recordsForPartition);
-
+
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
-
+ .thenReturn(new ConsumerRecords<>(records));
+
for (int i = 0; i < recordsForPartition.size(); i++) {
spout.nextTuple();
}
-
+
ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture());
-
+
for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
spout.fail(messageId);
}
@@ -111,16 +102,15 @@ public class KafkaSpoutRetryLimitTest {
// Advance time and then trigger call to kafka consumer commit
Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
spout.nextTuple();
-
- ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class);
+
InOrder inOrder = inOrder(consumerMock);
- inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
+ inOrder.verify(consumerMock).commitSync(commitCapture.capture());
inOrder.verify(consumerMock).poll(anyLong());
//verify that Offset 3 was committed for the given TopicPartition
- assertTrue(committedOffsets.getValue().containsKey(partition));
- assertEquals(lastOffset, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset());
+ assertTrue(commitCapture.getValue().containsKey(partition));
+ assertEquals(lastOffset, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset());
}
}
-
-}
\ No newline at end of file
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index 9ebdcf7..261c654 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -22,12 +22,15 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,7 +61,7 @@ public class MaxUncommittedOffsetTest {
private final int maxUncommittedOffsets = 10;
private final int maxPollRecords = 5;
private final int initialRetryDelaySecs = 60;
- private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+ private final KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
.setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
.setMaxUncommittedOffsets(maxUncommittedOffsets)
@@ -93,6 +96,8 @@ public class MaxUncommittedOffsetTest {
private void initializeSpout(int msgCount) throws Exception {
populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+ when(topologyContext.getThisTaskIndex()).thenReturn(0);
+ when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
spout.open(conf, topologyContext, collector);
spout.activate();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
deleted file mode 100644
index e97c7e1..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Before;
-import org.junit.Test;
-
-public class NamedTopicFilterTest {
-
- private KafkaConsumer<?, ?> consumerMock;
-
- @Before
- public void setUp() {
- consumerMock = mock(KafkaConsumer.class);
- }
-
- @Test
- public void testFilter() {
- String matchingTopicOne = "test-1";
- String matchingTopicTwo = "test-11";
- String unmatchedTopic = "unmatched";
-
- NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo);
-
- when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
- List<PartitionInfo> partitionTwoPartitions = new ArrayList<>();
- partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
- partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
- when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
- when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
-
- List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
-
- assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions,
- containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
-
- }
-
- private PartitionInfo createPartitionInfo(String topic, int partition) {
- return new PartitionInfo(topic, partition, null, null, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
deleted file mode 100644
index 877efdc..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Before;
-import org.junit.Test;
-
-public class PatternTopicFilterTest {
-
- private KafkaConsumer<?, ?> consumerMock;
-
- @Before
- public void setUp(){
- consumerMock = mock(KafkaConsumer.class);
- }
-
- @Test
- public void testFilter() {
- Pattern pattern = Pattern.compile("test-\\d+");
- PatternTopicFilter filter = new PatternTopicFilter(pattern);
-
- String matchingTopicOne = "test-1";
- String matchingTopicTwo = "test-11";
- String unmatchedTopic = "unmatched";
-
- Map<String, List<PartitionInfo>> allTopics = new HashMap<>();
- allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
- List<PartitionInfo> testTwoPartitions = new ArrayList<>();
- testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
- testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
- allTopics.put(matchingTopicTwo, testTwoPartitions);
- allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
-
- when(consumerMock.listTopics()).thenReturn(allTopics);
-
- List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
-
- assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions,
- containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
- }
-
- private PartitionInfo createPartitionInfo(String topic, int partition) {
- return new PartitionInfo(topic, partition, null, null, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 7f0973b..6b92de8 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -20,6 +20,7 @@ package org.apache.storm.kafka.spout;
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -28,7 +29,9 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -77,12 +80,12 @@ public class SingleTopicKafkaSpoutTest {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
- KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+ KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
.setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
.build();
- this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
+ this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig));
this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
}
@@ -100,6 +103,8 @@ public class SingleTopicKafkaSpoutTest {
private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+ when(topologyContext.getThisTaskIndex()).thenReturn(0);
+ when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
spout.open(conf, topologyContext, collector);
spout.activate();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
new file mode 100644
index 0000000..5f931bb
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+
+public class SpoutWithMockedConsumerSetupHelper {
+
+ /**
+ * Creates, opens and activates a KafkaSpout using a mocked consumer.
+ * @param <K> The Kafka key type
+ * @param <V> The Kafka value type
+ * @param spoutConfig The spout config to use
+ * @param topoConf The topo conf to pass to the spout
+ * @param contextMock The topo context to pass to the spout
+ * @param collectorMock The mocked collector to pass to the spout
+ * @param consumerMock The mocked consumer
+ * @param assignedPartitions The partitions to assign to this spout. The consumer will act like these partitions are assigned to it.
+ * @return The spout
+ */
+ public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf,
+ TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {
+
+ Map<String, List<PartitionInfo>> partitionInfos = assignedPartitions.stream()
+ .map(tp -> new PartitionInfo(tp.topic(), tp.partition(), null, null, null))
+ .collect(Collectors.groupingBy(info -> info.topic()));
+ partitionInfos.keySet()
+ .forEach(key -> when(consumerMock.partitionsFor(key))
+ .thenReturn(partitionInfos.get(key)));
+ KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
+
+ KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+ when(contextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
+ when(contextMock.getThisTaskIndex()).thenReturn(0);
+
+ spout.open(topoConf, contextMock, collectorMock);
+ spout.activate();
+
+ verify(consumerMock).assign(assignedPartitions);
+
+ return spout;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 62dbfe5..d2f38b0 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.kafka.spout.builders;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -24,16 +25,26 @@ import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.subscription.Subscription;
import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class SingleTopicKafkaSpoutConfiguration {
+
public static final String STREAM = "test_stream";
public static final String TOPIC = "test";
+ /**
+ * Retry in a tight loop (keep unit tests fasts).
+ */
+ public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
+ new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+ DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
public static Config getConfig() {
Config config = new Config();
config.setDebug(true);
@@ -47,20 +58,27 @@ public class SingleTopicKafkaSpoutConfiguration {
return tp.createTopology();
}
- public static KafkaSpoutConfig.Builder<String,String> getKafkaSpoutConfigBuilder(int port) {
- return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
- .setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()),
- new Fields("topic", "key", "value"), STREAM)
- .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
- .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
- .setRetry(getRetryService())
- .setOffsetCommitPeriodMs(10_000)
- .setFirstPollOffsetStrategy(EARLIEST)
- .setMaxUncommittedOffsets(250)
- .setPollTimeoutMs(1000);
+ public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(int port) {
+ return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC));
+ }
+
+ public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(Subscription subscription, int port) {
+ return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription));
}
-
+
+ private static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
+ return config.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()),
+ new Fields("topic", "key", "value"), STREAM)
+ .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+ .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+ .setRetry(getRetryService())
+ .setOffsetCommitPeriodMs(10_000)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setMaxUncommittedOffsets(250)
+ .setPollTimeoutMs(1000);
+ }
+
protected static KafkaSpoutRetryService getRetryService() {
- return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE;
+ return UNIT_TEST_RETRY_SERVICE;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
new file mode 100644
index 0000000..3985619
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NamedTopicFilterTest {
+
+ private KafkaConsumer<?, ?> consumerMock;
+
+ @Before
+ public void setUp() {
+ consumerMock = mock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testFilter() {
+ String matchingTopicOne = "test-1";
+ String matchingTopicTwo = "test-11";
+ String unmatchedTopic = "unmatched";
+
+ NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo);
+
+ when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+ List<PartitionInfo> partitionTwoPartitions = new ArrayList<>();
+ partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+ partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+ when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
+ when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+
+ List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+
+ assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions,
+ containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+
+ }
+
+ private PartitionInfo createPartitionInfo(String topic, int partition) {
+ return new PartitionInfo(topic, partition, null, null, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
new file mode 100644
index 0000000..67411e3
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PatternTopicFilterTest {
+
+ private KafkaConsumer<?, ?> consumerMock;
+
+ @Before
+ public void setUp(){
+ consumerMock = mock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testFilter() {
+ Pattern pattern = Pattern.compile("test-\\d+");
+ PatternTopicFilter filter = new PatternTopicFilter(pattern);
+
+ String matchingTopicOne = "test-1";
+ String matchingTopicTwo = "test-11";
+ String unmatchedTopic = "unmatched";
+
+ Map<String, List<PartitionInfo>> allTopics = new HashMap<>();
+ allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+ List<PartitionInfo> testTwoPartitions = new ArrayList<>();
+ testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+ testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+ allTopics.put(matchingTopicTwo, testTwoPartitions);
+ allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+
+ when(consumerMock.listTopics()).thenReturn(allTopics);
+
+ List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+
+ assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions,
+ containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+ }
+
+ private PartitionInfo createPartitionInfo(String topic, int partition) {
+ return new PartitionInfo(topic, partition, null, null, null);
+ }
+}
[4/4] storm git commit: Changelog: STORM-2542
Posted by sr...@apache.org.
Changelog: STORM-2542
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3580dbc8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3580dbc8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3580dbc8
Branch: refs/heads/master
Commit: 3580dbc806307fc1bda805f04d910657a6fa61d5
Parents: f67699c
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Jul 19 20:18:11 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jul 19 20:18:11 2017 +0200
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3580dbc8/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cda77f1..625e7b5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2542: Remove storm-kafka-client KafkaConsumer.subscribe API option, make KafkaConsumer.assign the default
* STORM-2133: add page-rendered-at timestamp on the UI
* STORM-2541: Fix storm-kafka-client manual subscription not being able to start consuming
* STORM-2622: Add owner resource summary on storm UI
[3/4] storm git commit: Merge branch 'STORM-2542' of
https://github.com/srdo/storm into STORM-2542-merge
Posted by sr...@apache.org.
Merge branch 'STORM-2542' of https://github.com/srdo/storm into STORM-2542-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f67699cc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f67699cc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f67699cc
Branch: refs/heads/master
Commit: f67699cce3d71c416ee4db3e8692c289cae0f4db
Parents: 10d381b fdb649e
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Jul 19 20:15:20 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jul 19 20:15:20 2017 +0200
----------------------------------------------------------------------
docs/storm-kafka-client.md | 7 +-
.../storm/kafka/spout/KafkaSpoutConfig.java | 175 ++++++++++---------
.../spout/ManualPartitionSubscription.java | 71 --------
.../storm/kafka/spout/ManualPartitioner.java | 40 -----
.../storm/kafka/spout/NamedSubscription.java | 61 -------
.../storm/kafka/spout/NamedTopicFilter.java | 67 -------
.../storm/kafka/spout/PatternSubscription.java | 54 ------
.../storm/kafka/spout/PatternTopicFilter.java | 69 --------
.../spout/RoundRobinManualPartitioner.java | 50 ------
.../apache/storm/kafka/spout/Subscription.java | 53 ------
.../apache/storm/kafka/spout/TopicFilter.java | 38 ----
.../ManualPartitionSubscription.java | 72 ++++++++
.../spout/subscription/ManualPartitioner.java | 40 +++++
.../spout/subscription/NamedTopicFilter.java | 67 +++++++
.../spout/subscription/PatternTopicFilter.java | 69 ++++++++
.../RoundRobinManualPartitioner.java | 50 ++++++
.../kafka/spout/subscription/Subscription.java | 53 ++++++
.../kafka/spout/subscription/TopicFilter.java | 38 ++++
.../storm/kafka/spout/KafkaSpoutCommitTest.java | 36 ++--
.../storm/kafka/spout/KafkaSpoutEmitTest.java | 48 ++---
.../kafka/spout/KafkaSpoutRebalanceTest.java | 37 ++--
.../kafka/spout/KafkaSpoutRetryLimitTest.java | 74 ++++----
.../kafka/spout/MaxUncommittedOffsetTest.java | 7 +-
.../storm/kafka/spout/NamedTopicFilterTest.java | 69 --------
.../kafka/spout/PatternTopicFilterTest.java | 73 --------
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 9 +-
.../SpoutWithMockedConsumerSetupHelper.java | 74 ++++++++
.../SingleTopicKafkaSpoutConfiguration.java | 44 +++--
.../subscription/NamedTopicFilterTest.java | 68 +++++++
.../subscription/PatternTopicFilterTest.java | 73 ++++++++
30 files changed, 827 insertions(+), 859 deletions(-)
----------------------------------------------------------------------
[2/4] storm git commit: STORM-2542: Remove storm-kafka-client
KafkaConsumer.subscribe API option
Posted by sr...@apache.org.
STORM-2542: Remove storm-kafka-client KafkaConsumer.subscribe API option
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fdb649e3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fdb649e3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fdb649e3
Branch: refs/heads/master
Commit: fdb649e352e05fd849cafa312bbd62fc75694579
Parents: cd6ca3e
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Mon Jun 5 14:59:19 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jul 19 00:18:03 2017 +0200
----------------------------------------------------------------------
docs/storm-kafka-client.md | 7 +-
.../storm/kafka/spout/KafkaSpoutConfig.java | 175 ++++++++++---------
.../spout/ManualPartitionSubscription.java | 71 --------
.../storm/kafka/spout/ManualPartitioner.java | 40 -----
.../storm/kafka/spout/NamedSubscription.java | 61 -------
.../storm/kafka/spout/NamedTopicFilter.java | 67 -------
.../storm/kafka/spout/PatternSubscription.java | 54 ------
.../storm/kafka/spout/PatternTopicFilter.java | 69 --------
.../spout/RoundRobinManualPartitioner.java | 50 ------
.../apache/storm/kafka/spout/Subscription.java | 53 ------
.../apache/storm/kafka/spout/TopicFilter.java | 38 ----
.../ManualPartitionSubscription.java | 72 ++++++++
.../spout/subscription/ManualPartitioner.java | 40 +++++
.../spout/subscription/NamedTopicFilter.java | 67 +++++++
.../spout/subscription/PatternTopicFilter.java | 69 ++++++++
.../RoundRobinManualPartitioner.java | 50 ++++++
.../kafka/spout/subscription/Subscription.java | 53 ++++++
.../kafka/spout/subscription/TopicFilter.java | 38 ++++
.../storm/kafka/spout/KafkaSpoutCommitTest.java | 36 ++--
.../storm/kafka/spout/KafkaSpoutEmitTest.java | 48 ++---
.../kafka/spout/KafkaSpoutRebalanceTest.java | 37 ++--
.../kafka/spout/KafkaSpoutRetryLimitTest.java | 74 ++++----
.../kafka/spout/MaxUncommittedOffsetTest.java | 7 +-
.../storm/kafka/spout/NamedTopicFilterTest.java | 69 --------
.../kafka/spout/PatternTopicFilterTest.java | 73 --------
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 9 +-
.../SpoutWithMockedConsumerSetupHelper.java | 74 ++++++++
.../SingleTopicKafkaSpoutConfiguration.java | 44 +++--
.../subscription/NamedTopicFilterTest.java | 68 +++++++
.../subscription/PatternTopicFilterTest.java | 73 ++++++++
30 files changed, 827 insertions(+), 859 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index ada8619..99b9ae5 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -240,12 +240,9 @@ streams. If you are doing this for Trident a value must be in the List returned
otherwise trident can throw exceptions.
-### Manual Partition Control (ADVANCED)
+### Manual Partition Assigment (ADVANCED)
-By default Kafka will automatically assign partitions to the current set of spouts. It handles lots of things, but in some cases you may want to manually assign the partitions.
-This can cause less churn in the assignments when spouts go down and come back up, but it can result in a lot of issues if not done right. This can all be handled by subclassing
-Subscription and we have a few implementations that you can look at for examples on how to do this. ManualPartitionNamedSubscription and ManualPartitionPatternSubscription. Again
-please be careful when using these or implementing your own.
+By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality.
## Use the Maven Shade Plugin to Build the Uber Jar
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 6f09f5f..72fa52e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -24,39 +24,41 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription;
+import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
+import org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
+import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.Subscription;
import org.apache.storm.tuple.Fields;
/**
* KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics.
*/
public class KafkaSpoutConfig<K, V> implements Serializable {
+
private static final long serialVersionUID = 141902646130682494L;
// 200ms
- public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
+ public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
// 30s
- public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
+ public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
// Retry forever
- public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
+ public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
// 10,000,000 records => 80MBs of memory footprint in the worst case
- public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
+ public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
// 2s
- public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
+ public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
- public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
- new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
- DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
- /**
- * Retry in a tight loop (keep unit tests fasts) do not use in production.
- */
- public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
- new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
- DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));
-
+ public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
+ new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+ DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+
// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
private final Subscription subscription;
@@ -73,9 +75,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
/**
* Creates a new KafkaSpoutConfig using a Builder.
+ *
* @param builder The Builder to construct the KafkaSpoutConfig from
*/
- public KafkaSpoutConfig(Builder<K,V> builder) {
+ public KafkaSpoutConfig(Builder<K, V> builder) {
this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
this.subscription = builder.subscription;
this.translator = builder.translator;
@@ -108,12 +111,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
EARLIEST,
LATEST,
UNCOMMITTED_EARLIEST,
- UNCOMMITTED_LATEST
+ UNCOMMITTED_LATEST
}
-
- public static class Builder<K,V> {
+
+ public static class Builder<K, V> {
+
private final Map<String, Object> kafkaProps;
- private Subscription subscription;
+ private final Subscription subscription;
private RecordTranslator<K, V> translator;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
@@ -123,20 +127,22 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
private boolean emitNullTuples = false;
- public Builder(String bootstrapServers, String ... topics) {
- this(bootstrapServers, new NamedSubscription(topics));
+ public Builder(String bootstrapServers, String... topics) {
+ this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
}
-
- public Builder(String bootstrapServers, Collection<String> topics) {
- this(bootstrapServers, new NamedSubscription(topics));
+
+ public Builder(String bootstrapServers, Set<String> topics) {
+ this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(),
+ new NamedTopicFilter(topics)));
}
-
+
public Builder(String bootstrapServers, Pattern topics) {
- this(bootstrapServers, new PatternSubscription(topics));
+ this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
}
-
+
/**
* Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
+ *
* @param bootstrapServers The bootstrap servers the consumer will use
* @param subscription The subscription defining which topics and partitions each spout instance will read.
*/
@@ -149,30 +155,30 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.subscription = subscription;
this.translator = new DefaultRecordTranslator<>();
}
-
+
/**
- * Set a {@link KafkaConsumer} property.
+ * Set a {@link KafkaConsumer} property.
*/
- public Builder<K,V> setProp(String key, Object value) {
+ public Builder<K, V> setProp(String key, Object value) {
kafkaProps.put(key, value);
return this;
}
-
+
/**
* Set multiple {@link KafkaConsumer} properties.
*/
- public Builder<K,V> setProp(Map<String, Object> props) {
+ public Builder<K, V> setProp(Map<String, Object> props) {
kafkaProps.putAll(props);
return this;
}
-
+
/**
* Set multiple {@link KafkaConsumer} properties.
*/
- public Builder<K,V> setProp(Properties props) {
+ public Builder<K, V> setProp(Properties props) {
props.forEach((key, value) -> {
if (key instanceof String) {
- kafkaProps.put((String)key, value);
+ kafkaProps.put((String) key, value);
} else {
throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
}
@@ -183,46 +189,51 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
//Spout Settings
/**
* Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
+ *
* @param pollTimeoutMs time in ms
*/
- public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
+ public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) {
this.pollTimeoutMs = pollTimeoutMs;
return this;
}
/**
* Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
+ *
* @param offsetCommitPeriodMs time in ms
*/
- public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
+ public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
this.offsetCommitPeriodMs = offsetCommitPeriodMs;
return this;
}
/**
- * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
- * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
- * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
- * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1.
+ * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. Once this
+ * limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number of pending offsets
+ * below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be exceeded,
+ * but no partition will exceed this limit by more than maxPollRecords - 1.
+ *
* @param maxUncommittedOffsets max number of records that can be be pending commit
*/
- public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
+ public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
this.maxUncommittedOffsets = maxUncommittedOffsets;
return this;
}
/**
- * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start.
- * Please refer to to the documentation in {@link FirstPollOffsetStrategy}
+ * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
+ * documentation in {@link FirstPollOffsetStrategy}
+ *
* @param firstPollOffsetStrategy Offset used by Kafka spout first poll
- * */
+ */
public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
this.firstPollOffsetStrategy = firstPollOffsetStrategy;
return this;
}
-
+
/**
* Sets the retry service for the spout to use.
+ *
* @param retryService the new retry service
* @return the builder (this).
*/
@@ -238,9 +249,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.translator = translator;
return this;
}
-
+
/**
* Configure a translator with tuples to be emitted on the default stream.
+ *
* @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
* @param fields the names of the fields extracted
* @return this to be able to chain configuration
@@ -248,9 +260,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
}
-
+
/**
* Configure a translator with tuples to be emitted to a given stream.
+ *
* @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
* @param fields the names of the fields extracted
* @param stream the stream to emit the tuples on
@@ -259,12 +272,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
}
-
+
/**
- * Sets partition refresh period in milliseconds. This is how often kafka will be polled
- * to check for new topics and/or new partitions.
- * This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
+ * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new
+ * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
* PatternSubscription rely on kafka to handle this instead.
+ *
* @param partitionRefreshPeriodMs time in milliseconds
* @return the builder (this)
*/
@@ -274,8 +287,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
/**
- * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly
- * ack them. By default this parameter is set to false, which means that null tuples are not emitted.
+ * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default
+ * this parameter is set to false, which means that null tuples are not emitted.
+ *
* @param emitNullTuples sets if null tuples should or not be emitted downstream
*/
public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) {
@@ -283,34 +297,36 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return this;
}
- public KafkaSpoutConfig<K,V> build() {
+ public KafkaSpoutConfig<K, V> build() {
return new KafkaSpoutConfig<>(this);
}
}
-
-
+
/**
* Factory method that creates a Builder with String key/value deserializers.
+ *
* @param bootstrapServers The bootstrap servers for the consumer
* @param topics The topics to subscribe to
* @return The new builder
*/
- public static Builder<String, String> builder(String bootstrapServers, String ... topics) {
+ public static Builder<String, String> builder(String bootstrapServers, String... topics) {
return setStringDeserializers(new Builder<>(bootstrapServers, topics));
}
-
+
/**
* Factory method that creates a Builder with String key/value deserializers.
+ *
* @param bootstrapServers The bootstrap servers for the consumer
* @param topics The topics to subscribe to
* @return The new builder
*/
- public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
+ public static Builder<String, String> builder(String bootstrapServers, Set<String> topics) {
return setStringDeserializers(new Builder<>(bootstrapServers, topics));
}
-
+
/**
* Factory method that creates a Builder with String key/value deserializers.
+ *
* @param bootstrapServers The bootstrap servers for the consumer
* @param topics The topic pattern to subscribe to
* @return The new builder
@@ -318,13 +334,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
return setStringDeserializers(new Builder<>(bootstrapServers, topics));
}
-
+
private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) {
builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return builder;
}
-
+
private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
// set defaults for properties not specified
if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
@@ -335,17 +351,18 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
/**
* Gets the properties that will be passed to the KafkaConsumer.
+ *
* @return The Kafka properties map
*/
public Map<String, Object> getKafkaProps() {
return kafkaProps;
}
-
+
public Subscription getSubscription() {
return subscription;
}
-
- public RecordTranslator<K,V> getTranslator() {
+
+ public RecordTranslator<K, V> getTranslator() {
return translator;
}
@@ -358,8 +375,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
}
public boolean isConsumerAutoCommitMode() {
- return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false
- || Boolean.valueOf((String)kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+ return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false
+ || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
}
public String getConsumerGroupId() {
@@ -377,7 +394,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public KafkaSpoutRetryService getRetryService() {
return retryService;
}
-
+
public long getPartitionRefreshPeriodMs() {
return partitionRefreshPeriodMs;
}
@@ -389,14 +406,14 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
@Override
public String toString() {
return "KafkaSpoutConfig{"
- + "kafkaProps=" + kafkaProps
- + ", pollTimeoutMs=" + pollTimeoutMs
- + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
- + ", maxUncommittedOffsets=" + maxUncommittedOffsets
- + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
- + ", subscription=" + subscription
- + ", translator=" + translator
- + ", retryService=" + retryService
- + '}';
+ + "kafkaProps=" + kafkaProps
+ + ", pollTimeoutMs=" + pollTimeoutMs
+ + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
+ + ", maxUncommittedOffsets=" + maxUncommittedOffsets
+ + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
+ + ", subscription=" + subscription
+ + ", translator=" + translator
+ + ", retryService=" + retryService
+ + '}';
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
deleted file mode 100644
index 2c65d6d..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-public class ManualPartitionSubscription extends Subscription {
- private static final long serialVersionUID = 5633018073527583826L;
- private final ManualPartitioner partitioner;
- private final TopicFilter partitionFilter;
- private Set<TopicPartition> currentAssignment = null;
- private KafkaConsumer<?, ?> consumer = null;
- private ConsumerRebalanceListener listener = null;
- private TopologyContext context = null;
-
- public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) {
- this.partitionFilter = partitionFilter;
- this.partitioner = parter;
- }
-
- @Override
- public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
- this.consumer = consumer;
- this.listener = listener;
- this.context = context;
- refreshAssignment();
- }
-
- @Override
- public void refreshAssignment() {
- List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
- Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
- Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
- if (!newAssignment.equals(currentAssignment)) {
- consumer.assign(newAssignment);
- if (currentAssignment != null) {
- listener.onPartitionsRevoked(currentAssignment);
- }
- currentAssignment = newAssignment;
- listener.onPartitionsAssigned(newAssignment);
- }
- }
-
- @Override
- public String getTopicsString() {
- return partitionFilter.getTopicsString();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
deleted file mode 100644
index 4856687..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.List;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * A function used to assign partitions to this spout.
- * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
- * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total
- * number of spouts to avoid missing partitions or double assigning partitions.
- */
-@FunctionalInterface
-public interface ManualPartitioner {
- /**
- * Get the partitions for this assignment
- * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
- * @param context the context of the topology
- * @return the subset of the partitions that this spout should use.
- */
- public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
deleted file mode 100644
index 0eb48cb..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Subscribe to all topics that follow a given list of values.
- */
-public class NamedSubscription extends Subscription {
- private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class);
- private static final long serialVersionUID = 3438543305215813839L;
- protected final Collection<String> topics;
-
- public NamedSubscription(Collection<String> topics) {
- this.topics = Collections.unmodifiableCollection(new ArrayList<>(topics));
- }
-
- public NamedSubscription(String ... topics) {
- this(Arrays.asList(topics));
- }
-
- @Override
- public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
- consumer.subscribe(topics, listener);
- LOG.info("Kafka consumer subscribed topics {}", topics);
-
- // Initial poll to get the consumer registration process going.
- // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
- consumer.poll(0);
- }
-
- @Override
- public String getTopicsString() {
- return String.join(",", topics);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
deleted file mode 100644
index 982828d..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Filter that returns all partitions for the specified topics.
- */
-public class NamedTopicFilter implements TopicFilter {
-
- private final Set<String> topics;
-
- /**
- * Create filter based on a set of topic names.
- * @param topics The topic names the filter will pass.
- */
- public NamedTopicFilter(Set<String> topics) {
- this.topics = Collections.unmodifiableSet(topics);
- }
-
- /**
- * Convenience constructor.
- * @param topics The topic names the filter will pass.
- */
- public NamedTopicFilter(String... topics) {
- this(new HashSet<>(Arrays.asList(topics)));
- }
-
- @Override
- public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
- List<TopicPartition> allPartitions = new ArrayList<>();
- for (String topic : topics) {
- for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
- allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
- }
- }
- return allPartitions;
- }
-
- @Override
- public String getTopicsString() {
- return String.join(",", topics);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
deleted file mode 100644
index ec53f01..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Subscribe to all topics that match a given pattern.
- */
-public class PatternSubscription extends Subscription {
- private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class);
- private static final long serialVersionUID = 3438543305215813839L;
- protected final Pattern pattern;
-
- public PatternSubscription(Pattern pattern) {
- this.pattern = pattern;
- }
-
- @Override
- public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
- consumer.subscribe(pattern, listener);
- LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
-
- // Initial poll to get the consumer registration process going.
- // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
- consumer.poll(0);
- }
-
- @Override
- public String getTopicsString() {
- return pattern.pattern();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
deleted file mode 100644
index 2964874..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Filter that returns all partitions for topics matching the given {@link Pattern}.
- */
-public class PatternTopicFilter implements TopicFilter {
-
- private final Pattern pattern;
- private final Set<String> topics = new HashSet<>();
-
- /**
- * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter.
- *
- * @param pattern The Pattern to use.
- */
- public PatternTopicFilter(Pattern pattern) {
- this.pattern = pattern;
- }
-
- @Override
- public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
- topics.clear();
- List<TopicPartition> allPartitions = new ArrayList<>();
- for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
- if (pattern.matcher(entry.getKey()).matches()) {
- for (PartitionInfo partitionInfo : entry.getValue()) {
- allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
- topics.add(partitionInfo.topic());
- }
- }
- }
- return allPartitions;
- }
-
- @Override
- public String getTopicsString() {
- return String.join(",", topics);
- }
-
- public String getTopicsPattern() {
- return pattern.pattern();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
deleted file mode 100644
index 4afcc49..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * Assign partitions in a round robin fashion for all spouts,
- * not just the ones that are alive. Because the parallelism of
- * the spouts does not typically change while running this makes
- * the assignments more stable in the face of crashing spouts.
- * <p/>
- * Round Robin means that first spout of N spouts will get the first
- * partition, and the N+1th partition... The second spout will get the second partition and
- * N+2th partition etc.
- */
-public class RoundRobinManualPartitioner implements ManualPartitioner {
-
- @Override
- public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
- int thisTaskIndex = context.getThisTaskIndex();
- int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
- Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1);
- for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
- myPartitions.add(allPartitions.get(i));
- }
- return new ArrayList<>(myPartitions);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
deleted file mode 100644
index 9c5a8c4..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.io.Serializable;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * A subscription to kafka.
- */
-public abstract class Subscription implements Serializable {
- private static final long serialVersionUID = -216136367240198716L;
-
- /**
- * Subscribe the KafkaConsumer to the proper topics
- * @param consumer the Consumer to get.
- * @param listener the rebalance listener to include in the subscription
- */
- public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
-
- /**
- * @return A human-readable string representing the subscribed topics.
- */
- public abstract String getTopicsString();
-
- /**
- * NOOP is the default behavior, which means that Kafka will internally handle partition assignment.
- * If you wish to do manual partition management, you must provide an implementation of this method
- * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe
- * to inform the rest of the system of those changes.
- */
- public void refreshAssignment() {
- //NOOP
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
deleted file mode 100644
index 7631c8a..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-public interface TopicFilter extends Serializable {
-
- /**
- * Get the Kafka TopicPartitions passed by this filter.
- * @param consumer The Kafka consumer to use to read the list of existing partitions
- * @return The Kafka partitions passed by this filter.
- */
- List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer);
-
- /**
- * @return A human-readable string representing the topics that pass the filter.
- */
- String getTopicsString();
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
new file mode 100644
index 0000000..17512ea
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.TopicPartitionComparator;
+import org.apache.storm.task.TopologyContext;
+
+public class ManualPartitionSubscription extends Subscription {
+ private static final long serialVersionUID = 5633018073527583826L;
+ private final ManualPartitioner partitioner;
+ private final TopicFilter partitionFilter;
+ private Set<TopicPartition> currentAssignment = null;
+ private KafkaConsumer<?, ?> consumer = null;
+ private ConsumerRebalanceListener listener = null;
+ private TopologyContext context = null;
+
+ public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) {
+ this.partitionFilter = partitionFilter;
+ this.partitioner = parter;
+ }
+
+ @Override
+ public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
+ this.consumer = consumer;
+ this.listener = listener;
+ this.context = context;
+ refreshAssignment();
+ }
+
+ @Override
+ public void refreshAssignment() {
+ List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
+ Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
+ Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
+ if (!newAssignment.equals(currentAssignment)) {
+ consumer.assign(newAssignment);
+ if (currentAssignment != null) {
+ listener.onPartitionsRevoked(currentAssignment);
+ }
+ currentAssignment = newAssignment;
+ listener.onPartitionsAssigned(newAssignment);
+ }
+ }
+
+ @Override
+ public String getTopicsString() {
+ return partitionFilter.getTopicsString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
new file mode 100644
index 0000000..dce7fc6
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.List;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A function used to assign partitions to this spout.
+ * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
+ * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total
+ * number of spouts to avoid missing partitions or double assigning partitions.
+ */
+@FunctionalInterface
+public interface ManualPartitioner {
+ /**
+ * Get the partitions for this assignment
+ * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
+ * @param context the context of the topology
+ * @return the subset of the partitions that this spout should use.
+ */
+ public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
new file mode 100644
index 0000000..d6e5fc2
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for the specified topics.
+ */
+public class NamedTopicFilter implements TopicFilter {
+
+ private final Set<String> topics;
+
+ /**
+ * Create filter based on a set of topic names.
+ * @param topics The topic names the filter will pass.
+ */
+ public NamedTopicFilter(Set<String> topics) {
+ this.topics = Collections.unmodifiableSet(topics);
+ }
+
+ /**
+ * Convenience constructor.
+ * @param topics The topic names the filter will pass.
+ */
+ public NamedTopicFilter(String... topics) {
+ this(new HashSet<>(Arrays.asList(topics)));
+ }
+
+ @Override
+ public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+ List<TopicPartition> allPartitions = new ArrayList<>();
+ for (String topic : topics) {
+ for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
+ allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ }
+ return allPartitions;
+ }
+
+ @Override
+ public String getTopicsString() {
+ return String.join(",", topics);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
new file mode 100644
index 0000000..98f8b23
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for topics matching the given {@link Pattern}.
+ */
+public class PatternTopicFilter implements TopicFilter {
+
+ private final Pattern pattern;
+ private final Set<String> topics = new HashSet<>();
+
+ /**
+ * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter.
+ *
+ * @param pattern The Pattern to use.
+ */
+ public PatternTopicFilter(Pattern pattern) {
+ this.pattern = pattern;
+ }
+
+ @Override
+ public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+ topics.clear();
+ List<TopicPartition> allPartitions = new ArrayList<>();
+ for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
+ if (pattern.matcher(entry.getKey()).matches()) {
+ for (PartitionInfo partitionInfo : entry.getValue()) {
+ allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ topics.add(partitionInfo.topic());
+ }
+ }
+ }
+ return allPartitions;
+ }
+
+ @Override
+ public String getTopicsString() {
+ return String.join(",", topics);
+ }
+
+ public String getTopicsPattern() {
+ return pattern.pattern();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
new file mode 100644
index 0000000..9660c77
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Assign partitions in a round robin fashion for all spouts,
+ * not just the ones that are alive. Because the parallelism of
+ * the spouts does not typically change while running this makes
+ * the assignments more stable in the face of crashing spouts.
+ * <p/>
+ * Round Robin means that first spout of N spouts will get the first
+ * partition, and the N+1th partition... The second spout will get the second partition and
+ * N+2th partition etc.
+ */
+public class RoundRobinManualPartitioner implements ManualPartitioner {
+
+ @Override
+ public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
+ int thisTaskIndex = context.getThisTaskIndex();
+ int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
+ Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1);
+ for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
+ myPartitions.add(allPartitions.get(i));
+ }
+ return new ArrayList<>(myPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
new file mode 100644
index 0000000..8091bfa
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+ private static final long serialVersionUID = -216136367240198716L;
+
+ /**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the subscription
+ */
+ public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
+
+ /**
+ * @return A human-readable string representing the subscribed topics.
+ */
+ public abstract String getTopicsString();
+
+ /**
+ * NOOP is the default behavior, which means that Kafka will internally handle partition assignment.
+ * If you wish to do manual partition management, you must provide an implementation of this method
+ * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe
+ * to inform the rest of the system of those changes.
+ */
+ public void refreshAssignment() {
+ //NOOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
new file mode 100644
index 0000000..497e3ca
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+public interface TopicFilter extends Serializable {
+
+ /**
+ * Get the Kafka TopicPartitions passed by this filter.
+ * @param consumer The Kafka consumer to use to read the list of existing partitions
+ * @return The Kafka partitions passed by this filter.
+ */
+ List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer);
+
+ /**
+ * @return A human-readable string representing the topics that pass the filter.
+ */
+ String getTopicsString();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
index 8dc34d4..7258fe2 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -26,16 +26,15 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
@@ -50,53 +49,38 @@ public class KafkaSpoutCommitTest {
private final Map<String, Object> conf = new HashMap<>();
private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
private KafkaConsumer<String, String> consumerMock;
- private KafkaSpout<String, String> spout;
- private KafkaSpoutConfig spoutConfig;
+ private KafkaSpoutConfig<String, String> spoutConfig;
@Captor
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
- private void setupSpout(Set<TopicPartition> assignedPartitions) {
+ @Before
+ public void setUp() {
MockitoAnnotations.initMocks(this);
spoutConfig = getKafkaSpoutConfigBuilder(-1)
- .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
- .build();
-
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .build();
consumerMock = mock(KafkaConsumer.class);
- KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-
- //Set up a spout listening to 1 topic partition
- spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
- spout.open(conf, contextMock, collectorMock);
- spout.activate();
-
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
- //Assign partitions to the spout
- ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
- consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
}
@Test
public void testCommitSuccessWithOffsetVoids() {
//Verify that the commit logic can handle offset voids
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
// Offsets emitted are 0,1,2,3,4,<void>,8,9
for (int i = 0; i < 5; i++) {
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
for (int i = 8; i < 10; i++) {
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
records.put(partition, recordsForPartition);
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
+ .thenReturn(new ConsumerRecords<>(records));
for (int i = 0; i < recordsForPartition.size(); i++) {
spout.nextTuple();
http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 24a2eda..8e6d390 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -16,7 +16,6 @@
package org.apache.storm.kafka.spout;
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
-import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.inOrder;
@@ -32,18 +31,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
@@ -56,45 +53,30 @@ public class KafkaSpoutEmitTest {
private final Map<String, Object> conf = new HashMap<>();
private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
private KafkaConsumer<String, String> consumerMock;
- private KafkaSpout<String, String> spout;
- private KafkaSpoutConfig spoutConfig;
+ private KafkaSpoutConfig<String, String> spoutConfig;
- private void setupSpout(Set<TopicPartition> assignedPartitions) {
+ @Before
+ public void setUp() {
spoutConfig = getKafkaSpoutConfigBuilder(-1)
.setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.build();
-
consumerMock = mock(KafkaConsumer.class);
- KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-
- //Set up a spout listening to 1 topic partition
- spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
- spout.open(conf, contextMock, collectorMock);
- spout.activate();
-
- ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
- verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
- //Assign partitions to the spout
- ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
- consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
}
@Test
public void testNextTupleEmitsAtMostOneTuple() {
//The spout should emit at most one message per call to nextTuple
//This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
records.put(partition, recordsForPartition);
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
+ .thenReturn(new ConsumerRecords<>(records));
spout.nextTuple();
@@ -107,17 +89,17 @@ public class KafkaSpoutEmitTest {
//Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
//This is cheating a bit since maxPollRecords would normally spread this across multiple polls
- recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
records.put(partition, recordsForPartition);
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(records));
+ .thenReturn(new ConsumerRecords<>(records));
for (int i = 0; i < recordsForPartition.size(); i++) {
spout.nextTuple();
@@ -172,13 +154,13 @@ public class KafkaSpoutEmitTest {
//Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying
try (SimulatedTime simulatedTime = new SimulatedTime()) {
- setupSpout(Collections.singleton(partition));
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPollRecords = new HashMap<>();
List<ConsumerRecord<String, String>> firstPollRecordsForPartition = new ArrayList<>();
for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
//This is cheating a bit since maxPollRecords would normally spread this across multiple polls
- firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ firstPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
}
firstPollRecords.put(partition, firstPollRecordsForPartition);
@@ -186,13 +168,13 @@ public class KafkaSpoutEmitTest {
Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPollRecords = new HashMap<>();
List<ConsumerRecord<String, String>> secondPollRecordsForPartition = new ArrayList<>();
for(int i = 0; i < maxPollRecords; i++) {
- secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
+ secondPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
}
secondPollRecords.put(partition, secondPollRecordsForPartition);
when(consumerMock.poll(anyLong()))
- .thenReturn(new ConsumerRecords(firstPollRecords))
- .thenReturn(new ConsumerRecords(secondPollRecords));
+ .thenReturn(new ConsumerRecords<>(firstPollRecords))
+ .thenReturn(new ConsumerRecords<>(secondPollRecords));
for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) {
spout.nextTuple();