You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/04 23:14:13 UTC
kafka git commit: HOTFIX: fix partition ordering in assignment
Repository: kafka
Updated Branches:
refs/heads/trunk 287e45ab4 -> 0a7b20e28
HOTFIX: fix partition ordering in assignment
workround partition ordering not preserved by the consumer group management.
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #868 from ymatsuda/partitionOrder
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a7b20e2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a7b20e2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a7b20e2
Branch: refs/heads/trunk
Commit: 0a7b20e2863b2519e63d85bc83610e63c58c6d46
Parents: 287e45a
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Thu Feb 4 14:14:08 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Feb 4 14:14:08 2016 -0800
----------------------------------------------------------------------
.../internals/StreamPartitionAssignor.java | 47 +++++++++++++++++---
1 file changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0a7b20e2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 74770a5..e600cf7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -50,6 +50,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -62,6 +63,34 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
+ private static class AssignedPartition implements Comparable<AssignedPartition> {
+ public final TaskId taskId;
+ public final TopicPartition partition;
+
+ public AssignedPartition(TaskId taskId, TopicPartition partition) {
+ this.taskId = taskId;
+ this.partition = partition;
+ }
+
+ @Override
+ public int compareTo(AssignedPartition that) {
+ return PARTITION_COMPARATOR.compare(this.partition, that.partition);
+ }
+ }
+
+ private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
+ @Override
+ public int compare(TopicPartition p1, TopicPartition p2) {
+ int result = p1.topic().compareTo(p2.topic());
+
+ if (result != 0) {
+ return result;
+ } else {
+ return p1.partition() < p2.partition() ? -1 : (p1.partition() > p2.partition() ? 1 : 0);
+ }
+ }
+ };
+
private StreamThread streamThread;
private int numStandbyReplicas;
@@ -341,20 +370,18 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
final int numConsumers = consumers.size();
- List<TaskId> active = new ArrayList<>();
Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
int i = 0;
for (String consumer : consumers) {
- List<TopicPartition> activePartitions = new ArrayList<>();
+ ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
final int numTaskIds = taskIds.size();
for (int j = i; j < numTaskIds; j += numConsumers) {
TaskId taskId = taskIds.get(j);
if (j < numActiveTasks) {
for (TopicPartition partition : partitionsForTask.get(taskId)) {
- activePartitions.add(partition);
- active.add(taskId);
+ assignedPartitions.add(new AssignedPartition(taskId, partition));
}
} else {
Set<TopicPartition> standbyPartitions = standby.get(taskId);
@@ -366,6 +393,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
}
+ Collections.sort(assignedPartitions);
+ List<TaskId> active = new ArrayList<>();
+ List<TopicPartition> activePartitions = new ArrayList<>();
+ for (AssignedPartition partition : assignedPartitions) {
+ active.add(partition.taskId);
+ activePartitions.add(partition.partition);
+ }
+
AssignmentInfo data = new AssignmentInfo(active, standby);
assignment.put(consumer, new Assignment(activePartitions, data.encode()));
i++;
@@ -441,7 +476,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
@Override
public void onAssignment(Assignment assignment) {
- List<TopicPartition> partitions = assignment.partitions();
+ List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
+
+ Collections.sort(partitions, PARTITION_COMPARATOR);
AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
this.standbyTasks = info.standbyTasks;