You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/11/29 19:07:43 UTC

kafka git commit: KAFKA-4427: Skip topic groups with no tasks

Repository: kafka
Updated Branches:
  refs/heads/trunk 7d3aa01ce -> 7d0f3e75a


KAFKA-4427: Skip topic groups with no tasks

Author: Eno Thereska <en...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #2171 from enothereska/KAFKA-4427-topicgroups-with-no-tasks


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d0f3e75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d0f3e75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d0f3e75

Branch: refs/heads/trunk
Commit: 7d0f3e75ad2934b8ad5b84f81707b01285022523
Parents: 7d3aa01
Author: Eno Thereska <en...@gmail.com>
Authored: Tue Nov 29 11:07:39 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 29 11:07:39 2016 -0800

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 19 ++++----
 .../SingleGroupPartitionGrouperStub.java        | 47 ++++++++++++++++++++
 .../internals/StreamPartitionAssignorTest.java  | 40 +++++++++++++++++
 3 files changed, 98 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d0f3e75/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index b06d7f7..84f78dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -430,15 +430,18 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
                 // the expected number of partitions is the max value of TaskId.partition + 1
                 int numPartitions = -1;
-                for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
-                    if (numPartitions < task.partition + 1)
-                        numPartitions = task.partition + 1;
-                }
-
-                InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
-                topicMetadata.numPartitions = numPartitions;
+                if (tasksByTopicGroup.get(topicGroupId) != null) {
+                    for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
+                        if (numPartitions < task.partition + 1)
+                            numPartitions = task.partition + 1;
+                    }
+                    InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
+                    topicMetadata.numPartitions = numPartitions;
 
-                changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
+                    changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
+                } else {
+                    log.debug("stream-thread [{}] No tasks found for topic group {}", streamThread.getName(), topicGroupId);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d0f3e75/streams/src/test/java/org/apache/kafka/streams/processor/internals/SingleGroupPartitionGrouperStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SingleGroupPartitionGrouperStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SingleGroupPartitionGrouperStub.java
new file mode 100644
index 0000000..0da5974
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SingleGroupPartitionGrouperStub.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.PartitionGrouper;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Used for testing the assignment of a subset of a topology group, not the entire topology
+ */
+public class SingleGroupPartitionGrouperStub implements PartitionGrouper {
+    private PartitionGrouper defaultPartitionGrouper = new DefaultPartitionGrouper();
+
+    @Override
+    public Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata) {
+        Map<Integer, Set<String>> includedTopicGroups = new HashMap<>();
+
+        for (Map.Entry<Integer, Set<String>> entry : topicGroups.entrySet()) {
+            includedTopicGroups.put(entry.getKey(), entry.getValue());
+            break; // arbitrarily use the first entry only
+        }
+        Map<TaskId, Set<TopicPartition>> result = defaultPartitionGrouper.partitionGroups(includedTopicGroups, metadata);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d0f3e75/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 65cc628..8e50356 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -215,6 +215,46 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
+    public void testAssignWithPartialTopology() throws Exception {
+        Properties props = configProps();
+        props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class);
+        StreamsConfig config = new StreamsConfig(props);
+
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+        builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
+        builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1");
+        builder.addSource("source2", "topic2");
+        builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
+        builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor2");
+        List<String> topics = Utils.mkList("topic1", "topic2");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+        UUID uuid1 = UUID.randomUUID();
+        String client1 = "client1";
+
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        subscriptions.put("consumer10",
+            new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
+
+        // will throw exception if it fails
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check assignment info
+        Set<TaskId> allActiveTasks = new HashSet<>();
+        AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
+        allActiveTasks.addAll(info10.activeTasks);
+
+        assertEquals(3, allActiveTasks.size());
+        assertEquals(allTasks, new HashSet<>(allActiveTasks));
+    }
+
+
+    @Test
     public void testAssignEmptyMetadata() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());