You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/07/09 21:47:24 UTC
[1/3] storm git commit: STORM-3082: Added support to handle absent
topics
Repository: storm
Updated Branches:
refs/heads/master da9cb5490 -> 2e3f76735
STORM-3082: Added support to handle absent topics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b82ffcff
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b82ffcff
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b82ffcff
Branch: refs/heads/master
Commit: b82ffcffa386277892c275bc31b267ef3dd6fa60
Parents: daec248
Author: Aniket Alhat <an...@rakuten.com>
Authored: Mon Jul 9 12:11:28 2018 +0900
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Jul 9 23:35:40 2018 +0200
----------------------------------------------------------------------
.../kafka/spout/subscription/NamedTopicFilter.java | 12 ++++++++++--
.../spout/subscription/NamedTopicFilterTest.java | 15 +++++++++++++++
2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b82ffcff/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
index 7c25596..277ebac 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
@@ -24,12 +24,15 @@ import java.util.Set;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Filter that returns all partitions for the specified topics.
*/
public class NamedTopicFilter implements TopicFilter {
+ private static final Logger LOG = LoggerFactory.getLogger(NamedTopicFilter.class);
private final Set<String> topics;
/**
@@ -52,8 +55,13 @@ public class NamedTopicFilter implements TopicFilter {
public Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer) {
Set<TopicPartition> allPartitions = new HashSet<>();
for (String topic : topics) {
- for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
- allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
+ if(partitionInfoList != null) {
+ for (PartitionInfo partitionInfo : partitionInfoList) {
+ allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ } else {
+ LOG.warn("Topic {} not found, skipping addition of the topic", topic);
}
}
return allPartitions;
http://git-wip-us.apache.org/repos/asf/storm/blob/b82ffcff/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
index a30a23a..c7717f9 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
@@ -16,6 +16,7 @@
package org.apache.storm.kafka.spout.subscription;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
@@ -62,6 +63,20 @@ public class NamedTopicFilterTest {
}
+ @Test
+ public void testFilterOnAbsentTopic() {
+ String presentTopic = "present";
+ String absentTopic = "absent";
+
+ NamedTopicFilter filter = new NamedTopicFilter(presentTopic, absentTopic);
+ when(consumerMock.partitionsFor(presentTopic)).thenReturn(Collections.singletonList(createPartitionInfo(presentTopic, 2)));
+ when(consumerMock.partitionsFor(absentTopic)).thenReturn(null);
+
+ List<TopicPartition> presentPartitions = filter.getFilteredTopicPartitions(consumerMock);
+ assertThat("Expected filter to pass only topics which are present", presentPartitions,
+ contains(new TopicPartition(presentTopic, 2)));
+ }
+
private PartitionInfo createPartitionInfo(String topic, int partition) {
return new PartitionInfo(topic, partition, null, null, null);
}
[3/3] storm git commit: Merge branch 'STORM-3082-merge' into
asfgit-master
Posted by sr...@apache.org.
Merge branch 'STORM-3082-merge' into asfgit-master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e3f7673
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e3f7673
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e3f7673
Branch: refs/heads/master
Commit: 2e3f767353be09bf08b2c74738523acfac4a9491
Parents: da9cb54 f6208b8
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Mon Jul 9 23:47:05 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Jul 9 23:47:05 2018 +0200
----------------------------------------------------------------------
.../kafka/spout/subscription/NamedTopicFilter.java | 12 ++++++++++--
.../spout/subscription/NamedTopicFilterTest.java | 15 +++++++++++++++
2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[2/3] storm git commit: STORM-3082: Port to master
Posted by sr...@apache.org.
STORM-3082: Port to master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f6208b84
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f6208b84
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f6208b84
Branch: refs/heads/master
Commit: f6208b84c5a4eda09e18bf9ee31e12f43802f304
Parents: b82ffcf
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Mon Jul 9 23:35:26 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Jul 9 23:35:46 2018 +0200
----------------------------------------------------------------------
.../apache/storm/kafka/spout/subscription/NamedTopicFilter.java | 2 +-
.../storm/kafka/spout/subscription/NamedTopicFilterTest.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f6208b84/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
index 277ebac..1591265 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
@@ -56,7 +56,7 @@ public class NamedTopicFilter implements TopicFilter {
Set<TopicPartition> allPartitions = new HashSet<>();
for (String topic : topics) {
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
- if(partitionInfoList != null) {
+ if (partitionInfoList != null) {
for (PartitionInfo partitionInfo : partitionInfoList) {
allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f6208b84/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
index c7717f9..f449a84 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
@@ -72,7 +72,7 @@ public class NamedTopicFilterTest {
when(consumerMock.partitionsFor(presentTopic)).thenReturn(Collections.singletonList(createPartitionInfo(presentTopic, 2)));
when(consumerMock.partitionsFor(absentTopic)).thenReturn(null);
- List<TopicPartition> presentPartitions = filter.getFilteredTopicPartitions(consumerMock);
+ Set<TopicPartition> presentPartitions = filter.getAllSubscribedPartitions(consumerMock);
assertThat("Expected filter to pass only topics which are present", presentPartitions,
contains(new TopicPartition(presentTopic, 2)));
}