You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/07/18 21:25:26 UTC

[1/3] storm git commit: STORM-2541: Fix storm-kafka-client manual subscription not being able to start consuming

Repository: storm
Updated Branches:
  refs/heads/master 399e35f10 -> cd6ca3ef0


STORM-2541: Fix storm-kafka-client manual subscription not being able to start consuming


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c7b7b896
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c7b7b896
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c7b7b896

Branch: refs/heads/master
Commit: c7b7b896bb0101a1b1b56677b647550b615d515a
Parents: 399e35f
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Mon Jun 5 14:59:19 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jul 18 22:36:46 2017 +0200

----------------------------------------------------------------------
 .../spout/ManualPartitionNamedSubscription.java | 78 --------------------
 .../ManualPartitionPatternSubscription.java     | 76 -------------------
 .../spout/ManualPartitionSubscription.java      | 71 ++++++++++++++++++
 .../storm/kafka/spout/NamedTopicFilter.java     | 67 +++++++++++++++++
 .../storm/kafka/spout/PatternTopicFilter.java   | 69 +++++++++++++++++
 .../apache/storm/kafka/spout/Subscription.java  |  2 +-
 .../apache/storm/kafka/spout/TopicFilter.java   | 38 ++++++++++
 .../storm/kafka/spout/NamedTopicFilterTest.java | 69 +++++++++++++++++
 .../kafka/spout/PatternTopicFilterTest.java     | 73 ++++++++++++++++++
 9 files changed, 388 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
deleted file mode 100644
index 926fdf0..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-public class ManualPartitionNamedSubscription extends NamedSubscription {
-    private static final long serialVersionUID = 5633018073527583826L;
-    private final ManualPartitioner partitioner;
-    private Set<TopicPartition> currentAssignment = null;
-    private KafkaConsumer<?, ?> consumer = null;
-    private ConsumerRebalanceListener listener = null;
-    private TopologyContext context = null;
-
-    public ManualPartitionNamedSubscription(ManualPartitioner parter, Collection<String> topics) {
-        super(topics);
-        this.partitioner = parter;
-    }
-    
-    public ManualPartitionNamedSubscription(ManualPartitioner parter, String ... topics) {
-        this(parter, Arrays.asList(topics));
-    }
-    
-    @Override
-    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
-        this.consumer = consumer;
-        this.listener = listener;
-        this.context = context;
-        refreshAssignment();
-    }
-    
-    @Override
-    public void refreshAssignment() {
-        List<TopicPartition> allPartitions = new ArrayList<>();
-        for (String topic : topics) {
-            for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
-                allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
-            }
-        }
-        Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
-        Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
-        if (!newAssignment.equals(currentAssignment)) {
-            if (currentAssignment != null) {
-                listener.onPartitionsRevoked(currentAssignment);
-                listener.onPartitionsAssigned(newAssignment);
-            }
-            currentAssignment = newAssignment;
-            consumer.assign(currentAssignment);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
deleted file mode 100644
index 2344477..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-public class ManualPartitionPatternSubscription extends PatternSubscription {
-    private static final long serialVersionUID = 5633018073527583826L;
-    private final ManualPartitioner parter;
-    private Set<TopicPartition> currentAssignment = null;
-    private KafkaConsumer<?, ?> consumer = null;
-    private ConsumerRebalanceListener listener = null;
-    private TopologyContext context = null;
-
-    public ManualPartitionPatternSubscription(ManualPartitioner parter, Pattern pattern) {
-        super(pattern);
-        this.parter = parter;
-    }
-    
-    @Override
-    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
-        this.consumer = consumer;
-        this.listener = listener;
-        this.context = context;
-        refreshAssignment();
-    }
-    
-    @Override
-    public void refreshAssignment() {
-        List<TopicPartition> allPartitions = new ArrayList<>();
-        for (Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) {
-            if (pattern.matcher(entry.getKey()).matches()) {
-                for (PartitionInfo partitionInfo: entry.getValue()) {
-                    allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
-                }
-            }
-        }
-        Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
-        Set<TopicPartition> newAssignment = new HashSet<>(parter.partition(allPartitions, context));
-        if (!newAssignment.equals(currentAssignment)) {
-            if (currentAssignment != null) {
-                listener.onPartitionsRevoked(currentAssignment);
-                listener.onPartitionsAssigned(newAssignment);
-            }
-            currentAssignment = newAssignment;
-            consumer.assign(currentAssignment);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
new file mode 100644
index 0000000..2c65d6d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+public class ManualPartitionSubscription extends Subscription {
+    private static final long serialVersionUID = 5633018073527583826L;
+    private final ManualPartitioner partitioner;
+    private final TopicFilter partitionFilter;
+    private Set<TopicPartition> currentAssignment = null;
+    private KafkaConsumer<?, ?> consumer = null;
+    private ConsumerRebalanceListener listener = null;
+    private TopologyContext context = null;
+
+    public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) {
+        this.partitionFilter = partitionFilter;
+        this.partitioner = parter;
+    }
+    
+    @Override
+    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
+        this.consumer = consumer;
+        this.listener = listener;
+        this.context = context;
+        refreshAssignment();
+    }
+    
+    @Override
+    public void refreshAssignment() {
+        List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
+        Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
+        Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
+        if (!newAssignment.equals(currentAssignment)) {
+            consumer.assign(newAssignment);
+            if (currentAssignment != null) {
+                listener.onPartitionsRevoked(currentAssignment);
+            }
+            currentAssignment = newAssignment;
+            listener.onPartitionsAssigned(newAssignment);
+        }
+    }
+    
+    @Override
+    public String getTopicsString() {
+        return partitionFilter.getTopicsString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
new file mode 100644
index 0000000..982828d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for the specified topics.
+ */
+public class NamedTopicFilter implements TopicFilter {
+
+    private final Set<String> topics;
+    
+    /**
+     * Create filter based on a set of topic names.
+     * @param topics The topic names the filter will pass.
+     */
+    public NamedTopicFilter(Set<String> topics) {
+        this.topics = Collections.unmodifiableSet(topics);
+    }
+    
+    /**
+     * Convenience constructor.
+     * @param topics The topic names the filter will pass.
+     */
+    public NamedTopicFilter(String... topics) {
+        this(new HashSet<>(Arrays.asList(topics)));
+    }
+    
+    @Override
+    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for (String topic : topics) {
+            for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
+                allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+            }
+        }
+        return allPartitions;
+    }
+
+    @Override
+    public String getTopicsString() {
+        return String.join(",", topics);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
new file mode 100644
index 0000000..2964874
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for topics matching the given {@link Pattern}.
+ */
+public class PatternTopicFilter implements TopicFilter {
+
+    private final Pattern pattern;
+    private final Set<String> topics = new HashSet<>();
+
+    /**
+     * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter.
+     *
+     * @param pattern The Pattern to use.
+     */
+    public PatternTopicFilter(Pattern pattern) {
+        this.pattern = pattern;
+    }
+
+    @Override
+    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+        topics.clear();
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
+            if (pattern.matcher(entry.getKey()).matches()) {
+                for (PartitionInfo partitionInfo : entry.getValue()) {
+                    allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+                    topics.add(partitionInfo.topic());
+                }
+            }
+        }
+        return allPartitions;
+    }
+
+    @Override
+    public String getTopicsString() {
+        return String.join(",", topics);
+    }
+
+    public String getTopicsPattern() {
+        return pattern.pattern();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
index 53e825a..9c5a8c4 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
@@ -37,7 +37,7 @@ public abstract class Subscription implements Serializable {
     public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
     
     /**
-     * @return a string representing the subscribed topics.
+     * @return A human-readable string representing the subscribed topics.
      */
     public abstract String getTopicsString();
     

http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
new file mode 100644
index 0000000..7631c8a
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+public interface TopicFilter extends Serializable {
+    
+    /**
+     * Get the Kafka TopicPartitions passed by this filter. 
+     * @param consumer The Kafka consumer to use to read the list of existing partitions
+     * @return The Kafka partitions passed by this filter.
+     */
+    List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer);
+    
+    /**
+     * @return A human-readable string representing the topics that pass the filter.
+     */
+    String getTopicsString();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
new file mode 100644
index 0000000..e97c7e1
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NamedTopicFilterTest {
+
+    private KafkaConsumer<?, ?> consumerMock;
+    
+    @Before
+    public void setUp() {
+        consumerMock = mock(KafkaConsumer.class);
+    }
+    
+    @Test
+    public void testFilter() {
+        String matchingTopicOne = "test-1";
+        String matchingTopicTwo = "test-11";
+        String unmatchedTopic = "unmatched";
+        
+        NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo);
+        
+        when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+        List<PartitionInfo> partitionTwoPartitions = new ArrayList<>();
+        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+        when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
+        when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+        
+        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        
+        assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, 
+            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+            
+    }
+    
+    private PartitionInfo createPartitionInfo(String topic, int partition) {
+        return new PartitionInfo(topic, partition, null, null, null);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/c7b7b896/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
new file mode 100644
index 0000000..877efdc
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PatternTopicFilterTest {
+
+    private KafkaConsumer<?, ?> consumerMock;
+    
+    @Before
+    public void setUp(){
+        consumerMock = mock(KafkaConsumer.class);
+    }
+    
+    @Test
+    public void testFilter() {
+        Pattern pattern = Pattern.compile("test-\\d+");
+        PatternTopicFilter filter = new PatternTopicFilter(pattern);
+        
+        String matchingTopicOne = "test-1";
+        String matchingTopicTwo = "test-11";
+        String unmatchedTopic = "unmatched";
+        
+        Map<String, List<PartitionInfo>> allTopics = new HashMap<>();
+        allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+        List<PartitionInfo> testTwoPartitions = new ArrayList<>();
+        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+        allTopics.put(matchingTopicTwo, testTwoPartitions);
+        allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+        
+        when(consumerMock.listTopics()).thenReturn(allTopics);
+        
+        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        
+        assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions,
+            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+    }
+    
+    private PartitionInfo createPartitionInfo(String topic, int partition) {
+        return new PartitionInfo(topic, partition, null, null, null);
+    }
+}


[3/3] storm git commit: Changelog: STORM-2541

Posted by sr...@apache.org.
Changelog: STORM-2541


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd6ca3ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd6ca3ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd6ca3ef

Branch: refs/heads/master
Commit: cd6ca3ef040bb812670be13d3b08420e81f05797
Parents: b9aefbe
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Tue Jul 18 22:39:12 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jul 18 22:39:12 2017 +0200

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cd6ca3ef/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 586034d..a7f8330 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2541: Fix storm-kafka-client manual subscription not being able to start consuming
  * STORM-2639: Kafka Spout incorrectly computes numCommittedOffsets due to voids in the topic (topic compaction)
  * STORM-2622: Add owner resource summary on storm UI
  * STORM-2634: Apply new code style to storm-sql-runtime


[2/3] storm git commit: Merge branch 'STORM-2541' of https://github.com/srdo/storm into STORM-2541-merge

Posted by sr...@apache.org.
Merge branch 'STORM-2541' of https://github.com/srdo/storm into STORM-2541-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9aefbe3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9aefbe3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9aefbe3

Branch: refs/heads/master
Commit: b9aefbe3bff2ca7d2bf6fd4ee217e046e3f5071f
Parents: 399e35f c7b7b89
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Tue Jul 18 22:38:48 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jul 18 22:38:48 2017 +0200

----------------------------------------------------------------------
 .../spout/ManualPartitionNamedSubscription.java | 78 --------------------
 .../ManualPartitionPatternSubscription.java     | 76 -------------------
 .../spout/ManualPartitionSubscription.java      | 71 ++++++++++++++++++
 .../storm/kafka/spout/NamedTopicFilter.java     | 67 +++++++++++++++++
 .../storm/kafka/spout/PatternTopicFilter.java   | 69 +++++++++++++++++
 .../apache/storm/kafka/spout/Subscription.java  |  2 +-
 .../apache/storm/kafka/spout/TopicFilter.java   | 38 ++++++++++
 .../storm/kafka/spout/NamedTopicFilterTest.java | 69 +++++++++++++++++
 .../kafka/spout/PatternTopicFilterTest.java     | 73 ++++++++++++++++++
 9 files changed, 388 insertions(+), 155 deletions(-)
----------------------------------------------------------------------