You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/07/18 21:25:26 UTC
[1/3] storm git commit: STORM-2541: Fix storm-kafka-client manual
subscription not being able to start consuming
Repository: storm
Updated Branches:
refs/heads/master 399e35f10 -> cd6ca3ef0
STORM-2541: Fix storm-kafka-client manual subscription not being able to start consuming
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7b7b896
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7b7b896
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7b7b896
Branch: refs/heads/master
Commit: c7b7b896bb0101a1b1b56677b647550b615d515a
Parents: 399e35f
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Mon Jun 5 14:59:19 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jul 18 22:36:46 2017 +0200
----------------------------------------------------------------------
.../spout/ManualPartitionNamedSubscription.java | 78 --------------------
.../ManualPartitionPatternSubscription.java | 76 -------------------
.../spout/ManualPartitionSubscription.java | 71 ++++++++++++++++++
.../storm/kafka/spout/NamedTopicFilter.java | 67 +++++++++++++++++
.../storm/kafka/spout/PatternTopicFilter.java | 69 +++++++++++++++++
.../apache/storm/kafka/spout/Subscription.java | 2 +-
.../apache/storm/kafka/spout/TopicFilter.java | 38 ++++++++++
.../storm/kafka/spout/NamedTopicFilterTest.java | 69 +++++++++++++++++
.../kafka/spout/PatternTopicFilterTest.java | 73 ++++++++++++++++++
9 files changed, 388 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
deleted file mode 100644
index 926fdf0..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-public class ManualPartitionNamedSubscription extends NamedSubscription {
- private static final long serialVersionUID = 5633018073527583826L;
- private final ManualPartitioner partitioner;
- private Set<TopicPartition> currentAssignment = null;
- private KafkaConsumer<?, ?> consumer = null;
- private ConsumerRebalanceListener listener = null;
- private TopologyContext context = null;
-
- public ManualPartitionNamedSubscription(ManualPartitioner parter, Collection<String> topics) {
- super(topics);
- this.partitioner = parter;
- }
-
- public ManualPartitionNamedSubscription(ManualPartitioner parter, String ... topics) {
- this(parter, Arrays.asList(topics));
- }
-
- @Override
- public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
- this.consumer = consumer;
- this.listener = listener;
- this.context = context;
- refreshAssignment();
- }
-
- @Override
- public void refreshAssignment() {
- List<TopicPartition> allPartitions = new ArrayList<>();
- for (String topic : topics) {
- for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
- allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
- }
- }
- Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
- Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
- if (!newAssignment.equals(currentAssignment)) {
- if (currentAssignment != null) {
- listener.onPartitionsRevoked(currentAssignment);
- listener.onPartitionsAssigned(newAssignment);
- }
- currentAssignment = newAssignment;
- consumer.assign(currentAssignment);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
deleted file mode 100644
index 2344477..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-public class ManualPartitionPatternSubscription extends PatternSubscription {
- private static final long serialVersionUID = 5633018073527583826L;
- private final ManualPartitioner parter;
- private Set<TopicPartition> currentAssignment = null;
- private KafkaConsumer<?, ?> consumer = null;
- private ConsumerRebalanceListener listener = null;
- private TopologyContext context = null;
-
- public ManualPartitionPatternSubscription(ManualPartitioner parter, Pattern pattern) {
- super(pattern);
- this.parter = parter;
- }
-
- @Override
- public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
- this.consumer = consumer;
- this.listener = listener;
- this.context = context;
- refreshAssignment();
- }
-
- @Override
- public void refreshAssignment() {
- List<TopicPartition> allPartitions = new ArrayList<>();
- for (Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) {
- if (pattern.matcher(entry.getKey()).matches()) {
- for (PartitionInfo partitionInfo: entry.getValue()) {
- allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
- }
- }
- }
- Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
- Set<TopicPartition> newAssignment = new HashSet<>(parter.partition(allPartitions, context));
- if (!newAssignment.equals(currentAssignment)) {
- if (currentAssignment != null) {
- listener.onPartitionsRevoked(currentAssignment);
- listener.onPartitionsAssigned(newAssignment);
- }
- currentAssignment = newAssignment;
- consumer.assign(currentAssignment);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
new file mode 100644
index 0000000..2c65d6d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+public class ManualPartitionSubscription extends Subscription {
+ private static final long serialVersionUID = 5633018073527583826L;
+ private final ManualPartitioner partitioner;
+ private final TopicFilter partitionFilter;
+ private Set<TopicPartition> currentAssignment = null;
+ private KafkaConsumer<?, ?> consumer = null;
+ private ConsumerRebalanceListener listener = null;
+ private TopologyContext context = null;
+
+ public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) {
+ this.partitionFilter = partitionFilter;
+ this.partitioner = parter;
+ }
+
+ @Override
+ public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
+ this.consumer = consumer;
+ this.listener = listener;
+ this.context = context;
+ refreshAssignment();
+ }
+
+ @Override
+ public void refreshAssignment() {
+ List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
+ Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
+ Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
+ if (!newAssignment.equals(currentAssignment)) {
+ consumer.assign(newAssignment);
+ if (currentAssignment != null) {
+ listener.onPartitionsRevoked(currentAssignment);
+ }
+ currentAssignment = newAssignment;
+ listener.onPartitionsAssigned(newAssignment);
+ }
+ }
+
+ @Override
+ public String getTopicsString() {
+ return partitionFilter.getTopicsString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
new file mode 100644
index 0000000..982828d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for the specified topics.
+ */
+public class NamedTopicFilter implements TopicFilter {
+
+ private final Set<String> topics;
+
+ /**
+ * Create filter based on a set of topic names.
+ * @param topics The topic names the filter will pass.
+ */
+ public NamedTopicFilter(Set<String> topics) {
+ this.topics = Collections.unmodifiableSet(topics);
+ }
+
+ /**
+ * Convenience constructor.
+ * @param topics The topic names the filter will pass.
+ */
+ public NamedTopicFilter(String... topics) {
+ this(new HashSet<>(Arrays.asList(topics)));
+ }
+
+ @Override
+ public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+ List<TopicPartition> allPartitions = new ArrayList<>();
+ for (String topic : topics) {
+ for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
+ allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ }
+ return allPartitions;
+ }
+
+ @Override
+ public String getTopicsString() {
+ return String.join(",", topics);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
new file mode 100644
index 0000000..2964874
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for topics matching the given {@link Pattern}.
+ */
+public class PatternTopicFilter implements TopicFilter {
+
+ private final Pattern pattern;
+ private final Set<String> topics = new HashSet<>();
+
+ /**
+ * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter.
+ *
+ * @param pattern The Pattern to use.
+ */
+ public PatternTopicFilter(Pattern pattern) {
+ this.pattern = pattern;
+ }
+
+ @Override
+ public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+ topics.clear();
+ List<TopicPartition> allPartitions = new ArrayList<>();
+ for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
+ if (pattern.matcher(entry.getKey()).matches()) {
+ for (PartitionInfo partitionInfo : entry.getValue()) {
+ allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ topics.add(partitionInfo.topic());
+ }
+ }
+ }
+ return allPartitions;
+ }
+
+ @Override
+ public String getTopicsString() {
+ return String.join(",", topics);
+ }
+
+ public String getTopicsPattern() {
+ return pattern.pattern();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
index 53e825a..9c5a8c4 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
@@ -37,7 +37,7 @@ public abstract class Subscription implements Serializable {
public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
/**
- * @return a string representing the subscribed topics.
+ * @return A human-readable string representing the subscribed topics.
*/
public abstract String getTopicsString();
http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
new file mode 100644
index 0000000..7631c8a
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+public interface TopicFilter extends Serializable {
+
+ /**
+ * Get the Kafka TopicPartitions passed by this filter.
+ * @param consumer The Kafka consumer to use to read the list of existing partitions
+ * @return The Kafka partitions passed by this filter.
+ */
+ List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer);
+
+ /**
+ * @return A human-readable string representing the topics that pass the filter.
+ */
+ String getTopicsString();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
new file mode 100644
index 0000000..e97c7e1
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NamedTopicFilterTest {
+
+ private KafkaConsumer<?, ?> consumerMock;
+
+ @Before
+ public void setUp() {
+ consumerMock = mock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testFilter() {
+ String matchingTopicOne = "test-1";
+ String matchingTopicTwo = "test-11";
+ String unmatchedTopic = "unmatched";
+
+ NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo);
+
+ when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+ List<PartitionInfo> partitionTwoPartitions = new ArrayList<>();
+ partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+ partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+ when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
+ when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+
+ List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+
+ assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions,
+ containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+
+ }
+
+ private PartitionInfo createPartitionInfo(String topic, int partition) {
+ return new PartitionInfo(topic, partition, null, null, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
new file mode 100644
index 0000000..877efdc
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PatternTopicFilterTest {
+
+ private KafkaConsumer<?, ?> consumerMock;
+
+ @Before
+ public void setUp(){
+ consumerMock = mock(KafkaConsumer.class);
+ }
+
+ @Test
+ public void testFilter() {
+ Pattern pattern = Pattern.compile("test-\\d+");
+ PatternTopicFilter filter = new PatternTopicFilter(pattern);
+
+ String matchingTopicOne = "test-1";
+ String matchingTopicTwo = "test-11";
+ String unmatchedTopic = "unmatched";
+
+ Map<String, List<PartitionInfo>> allTopics = new HashMap<>();
+ allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+ List<PartitionInfo> testTwoPartitions = new ArrayList<>();
+ testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+ testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+ allTopics.put(matchingTopicTwo, testTwoPartitions);
+ allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+
+ when(consumerMock.listTopics()).thenReturn(allTopics);
+
+ List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+
+ assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions,
+ containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+ }
+
+ private PartitionInfo createPartitionInfo(String topic, int partition) {
+ return new PartitionInfo(topic, partition, null, null, null);
+ }
+}
[3/3] storm git commit: Changelog: STORM-2541
Posted by sr...@apache.org.
Changelog: STORM-2541
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd6ca3ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd6ca3ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd6ca3ef
Branch: refs/heads/master
Commit: cd6ca3ef040bb812670be13d3b08420e81f05797
Parents: b9aefbe
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Tue Jul 18 22:39:12 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jul 18 22:39:12 2017 +0200
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cd6ca3ef/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 586034d..a7f8330 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2541: Fix storm-kafka-client manual subscription not being able to start consuming
* STORM-2639: Kafka Spout incorrectly computes numCommittedOffsets due to voids in the topic (topic compaction)
* STORM-2622: Add owner resource summary on storm UI
* STORM-2634: Apply new code style to storm-sql-runtime
[2/3] storm git commit: Merge branch 'STORM-2541' of
https://github.com/srdo/storm into STORM-2541-merge
Posted by sr...@apache.org.
Merge branch 'STORM-2541' of https://github.com/srdo/storm into STORM-2541-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9aefbe3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9aefbe3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9aefbe3
Branch: refs/heads/master
Commit: b9aefbe3bff2ca7d2bf6fd4ee217e046e3f5071f
Parents: 399e35f c7b7b89
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Tue Jul 18 22:38:48 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jul 18 22:38:48 2017 +0200
----------------------------------------------------------------------
.../spout/ManualPartitionNamedSubscription.java | 78 --------------------
.../ManualPartitionPatternSubscription.java | 76 -------------------
.../spout/ManualPartitionSubscription.java | 71 ++++++++++++++++++
.../storm/kafka/spout/NamedTopicFilter.java | 67 +++++++++++++++++
.../storm/kafka/spout/PatternTopicFilter.java | 69 +++++++++++++++++
.../apache/storm/kafka/spout/Subscription.java | 2 +-
.../apache/storm/kafka/spout/TopicFilter.java | 38 ++++++++++
.../storm/kafka/spout/NamedTopicFilterTest.java | 69 +++++++++++++++++
.../kafka/spout/PatternTopicFilterTest.java | 73 ++++++++++++++++++
9 files changed, 388 insertions(+), 155 deletions(-)
----------------------------------------------------------------------