You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/04 23:14:13 UTC

kafka git commit: HOTFIX: fix partition ordering in assignment

Repository: kafka
Updated Branches:
  refs/heads/trunk 287e45ab4 -> 0a7b20e28


HOTFIX: fix partition ordering in assignment

workround partition ordering not preserved by the consumer group management.
guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #868 from ymatsuda/partitionOrder


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a7b20e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a7b20e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a7b20e2

Branch: refs/heads/trunk
Commit: 0a7b20e2863b2519e63d85bc83610e63c58c6d46
Parents: 287e45a
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Feb 4 14:14:08 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Feb 4 14:14:08 2016 -0800

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 47 +++++++++++++++++---
 1 file changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0a7b20e2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 74770a5..e600cf7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -50,6 +50,7 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -62,6 +63,34 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
 
+    private static class AssignedPartition implements Comparable<AssignedPartition> {
+        public final TaskId taskId;
+        public final TopicPartition partition;
+
+        public AssignedPartition(TaskId taskId, TopicPartition partition) {
+            this.taskId = taskId;
+            this.partition = partition;
+        }
+
+        @Override
+        public int compareTo(AssignedPartition that) {
+            return PARTITION_COMPARATOR.compare(this.partition, that.partition);
+        }
+    }
+
+    private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
+        @Override
+        public int compare(TopicPartition p1, TopicPartition p2) {
+            int result = p1.topic().compareTo(p2.topic());
+
+            if (result != 0) {
+                return result;
+            } else {
+                return p1.partition() < p2.partition() ? -1 : (p1.partition() > p2.partition() ? 1 : 0);
+            }
+        }
+    };
+
     private StreamThread streamThread;
 
     private int numStandbyReplicas;
@@ -341,20 +370,18 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
 
             final int numConsumers = consumers.size();
-            List<TaskId> active = new ArrayList<>();
             Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
 
             int i = 0;
             for (String consumer : consumers) {
-                List<TopicPartition> activePartitions = new ArrayList<>();
+                ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
 
                 final int numTaskIds = taskIds.size();
                 for (int j = i; j < numTaskIds; j += numConsumers) {
                     TaskId taskId = taskIds.get(j);
                     if (j < numActiveTasks) {
                         for (TopicPartition partition : partitionsForTask.get(taskId)) {
-                            activePartitions.add(partition);
-                            active.add(taskId);
+                            assignedPartitions.add(new AssignedPartition(taskId, partition));
                         }
                     } else {
                         Set<TopicPartition> standbyPartitions = standby.get(taskId);
@@ -366,6 +393,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     }
                 }
 
+                Collections.sort(assignedPartitions);
+                List<TaskId> active = new ArrayList<>();
+                List<TopicPartition> activePartitions = new ArrayList<>();
+                for (AssignedPartition partition : assignedPartitions) {
+                    active.add(partition.taskId);
+                    activePartitions.add(partition.partition);
+                }
+
                 AssignmentInfo data = new AssignmentInfo(active, standby);
                 assignment.put(consumer, new Assignment(activePartitions, data.encode()));
                 i++;
@@ -441,7 +476,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     @Override
     public void onAssignment(Assignment assignment) {
-        List<TopicPartition> partitions = assignment.partitions();
+        List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
+
+        Collections.sort(partitions, PARTITION_COMPARATOR);
 
         AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
         this.standbyTasks = info.standbyTasks;