You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/24 05:34:02 UTC
kafka git commit: Cherry-pick KAFKA-4355: Skip topics that have no
partitions, by Eno Thereska
Repository: kafka
Updated Branches:
refs/heads/0.10.1 ecb51680a -> 68e7af812
Cherry-pick KAFKA-4355: Skip topics that have no partitions, by Eno Thereska
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68e7af81
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68e7af81
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68e7af81
Branch: refs/heads/0.10.1
Commit: 68e7af812b94494e5798f3644b569e454036ac12
Parents: ecb5168
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Nov 23 21:33:33 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 21:33:33 2016 -0800
----------------------------------------------------------------------
.../streams/processor/DefaultPartitionGrouper.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/68e7af81/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index f0fb38c..554f7d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -58,7 +58,8 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
Set<TopicPartition> group = new HashSet<>(topicGroup.size());
for (String topic : topicGroup) {
- if (partitionId < metadata.partitionsForTopic(topic).size()) {
+ List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
+ if (partitions != null && partitionId < partitions.size()) {
group.add(new TopicPartition(topic, partitionId));
}
}
@@ -77,12 +78,11 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
for (String topic : topics) {
List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
- if (partitions == null)
- throw new StreamsException("Topic not found during partition assignment: " + topic);
-
- int numPartitions = partitions.size();
- if (numPartitions > maxNumPartitions)
- maxNumPartitions = numPartitions;
+ if (partitions != null) {
+ int numPartitions = partitions.size();
+ if (numPartitions > maxNumPartitions)
+ maxNumPartitions = numPartitions;
+ }
}
return maxNumPartitions;
}