You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/07/09 21:47:24 UTC

[1/3] storm git commit: STORM-3082: Added support to handle absent topics

Repository: storm
Updated Branches:
  refs/heads/master da9cb5490 -> 2e3f76735


STORM-3082: Added support to handle absent topics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b82ffcff
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b82ffcff
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b82ffcff

Branch: refs/heads/master
Commit: b82ffcffa386277892c275bc31b267ef3dd6fa60
Parents: daec248
Author: Aniket Alhat <an...@rakuten.com>
Authored: Mon Jul 9 12:11:28 2018 +0900
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Jul 9 23:35:40 2018 +0200

----------------------------------------------------------------------
 .../kafka/spout/subscription/NamedTopicFilter.java   | 12 ++++++++++--
 .../spout/subscription/NamedTopicFilterTest.java     | 15 +++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b82ffcff/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
index 7c25596..277ebac 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
@@ -24,12 +24,15 @@ import java.util.Set;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Filter that returns all partitions for the specified topics.
  */
 public class NamedTopicFilter implements TopicFilter {
 
+    private static final Logger LOG = LoggerFactory.getLogger(NamedTopicFilter.class);
     private final Set<String> topics;
     
     /**
@@ -52,8 +55,13 @@ public class NamedTopicFilter implements TopicFilter {
     public Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer) {
         Set<TopicPartition> allPartitions = new HashSet<>();
         for (String topic : topics) {
-            for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
-                allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+            List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
+            if(partitionInfoList != null) {
+                for (PartitionInfo partitionInfo : partitionInfoList) {
+                    allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+                }
+            } else {
+                LOG.warn("Topic {} not found, skipping addition of the topic", topic);
             }
         }
         return allPartitions;

http://git-wip-us.apache.org/repos/asf/storm/blob/b82ffcff/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
index a30a23a..c7717f9 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
@@ -16,6 +16,7 @@
 
 package org.apache.storm.kafka.spout.subscription;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
@@ -62,6 +63,20 @@ public class NamedTopicFilterTest {
             
     }
     
+    @Test
+    public void testFilterOnAbsentTopic() {
+        String presentTopic = "present";
+        String absentTopic = "absent";
+        
+        NamedTopicFilter filter = new NamedTopicFilter(presentTopic, absentTopic);
+        when(consumerMock.partitionsFor(presentTopic)).thenReturn(Collections.singletonList(createPartitionInfo(presentTopic, 2)));
+        when(consumerMock.partitionsFor(absentTopic)).thenReturn(null);
+        
+        List<TopicPartition> presentPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        assertThat("Expected filter to pass only topics which are present", presentPartitions,
+            contains(new TopicPartition(presentTopic, 2)));
+    }
+    
     private PartitionInfo createPartitionInfo(String topic, int partition) {
         return new PartitionInfo(topic, partition, null, null, null);
     }


[3/3] storm git commit: Merge branch 'STORM-3082-merge' into asfgit-master

Posted by sr...@apache.org.
Merge branch 'STORM-3082-merge' into asfgit-master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e3f7673
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e3f7673
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e3f7673

Branch: refs/heads/master
Commit: 2e3f767353be09bf08b2c74738523acfac4a9491
Parents: da9cb54 f6208b8
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Mon Jul 9 23:47:05 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Jul 9 23:47:05 2018 +0200

----------------------------------------------------------------------
 .../kafka/spout/subscription/NamedTopicFilter.java   | 12 ++++++++++--
 .../spout/subscription/NamedTopicFilterTest.java     | 15 +++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[2/3] storm git commit: STORM-3082: Port to master

Posted by sr...@apache.org.
STORM-3082: Port to master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f6208b84
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f6208b84
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f6208b84

Branch: refs/heads/master
Commit: f6208b84c5a4eda09e18bf9ee31e12f43802f304
Parents: b82ffcf
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Mon Jul 9 23:35:26 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Jul 9 23:35:46 2018 +0200

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/subscription/NamedTopicFilter.java    | 2 +-
 .../storm/kafka/spout/subscription/NamedTopicFilterTest.java       | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f6208b84/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
index 277ebac..1591265 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
@@ -56,7 +56,7 @@ public class NamedTopicFilter implements TopicFilter {
         Set<TopicPartition> allPartitions = new HashSet<>();
         for (String topic : topics) {
             List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
-            if(partitionInfoList != null) {
+            if (partitionInfoList != null) {
                 for (PartitionInfo partitionInfo : partitionInfoList) {
                     allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f6208b84/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
index c7717f9..f449a84 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
@@ -72,7 +72,7 @@ public class NamedTopicFilterTest {
         when(consumerMock.partitionsFor(presentTopic)).thenReturn(Collections.singletonList(createPartitionInfo(presentTopic, 2)));
         when(consumerMock.partitionsFor(absentTopic)).thenReturn(null);
         
-        List<TopicPartition> presentPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        Set<TopicPartition> presentPartitions = filter.getAllSubscribedPartitions(consumerMock);
         assertThat("Expected filter to pass only topics which are present", presentPartitions,
             contains(new TopicPartition(presentTopic, 2)));
     }