You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/24 05:34:02 UTC

kafka git commit: Cherry-pick KAFKA-4355: Skip topics that have no partitions, by Eno Thereska

Repository: kafka
Updated Branches:
  refs/heads/0.10.1 ecb51680a -> 68e7af812


Cherry-pick KAFKA-4355: Skip topics that have no partitions, by Eno Thereska


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68e7af81
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68e7af81
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68e7af81

Branch: refs/heads/0.10.1
Commit: 68e7af812b94494e5798f3644b569e454036ac12
Parents: ecb5168
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Nov 23 21:33:33 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 23 21:33:33 2016 -0800

----------------------------------------------------------------------
 .../streams/processor/DefaultPartitionGrouper.java    | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68e7af81/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index f0fb38c..554f7d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -58,7 +58,8 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
                 Set<TopicPartition> group = new HashSet<>(topicGroup.size());
 
                 for (String topic : topicGroup) {
-                    if (partitionId < metadata.partitionsForTopic(topic).size()) {
+                    List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
+                    if (partitions != null && partitionId < partitions.size()) {
                         group.add(new TopicPartition(topic, partitionId));
                     }
                 }
@@ -77,12 +78,11 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
         for (String topic : topics) {
             List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
 
-            if (partitions == null)
-                throw new StreamsException("Topic not found during partition assignment: " + topic);
-
-            int numPartitions = partitions.size();
-            if (numPartitions > maxNumPartitions)
-                maxNumPartitions = numPartitions;
+            if (partitions != null) {
+                int numPartitions = partitions.size();
+                if (numPartitions > maxNumPartitions)
+                    maxNumPartitions = numPartitions;
+            }
         }
         return maxNumPartitions;
     }