You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/21 08:45:52 UTC

[GitHub] [kafka] chia7712 commented on a change in pull request #10564: MINOR: clean up some replication code

chia7712 commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r617297824



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -321,22 +339,17 @@ public void replay(PartitionRecord record) {
         }
         PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
         PartitionControlInfo prevPartInfo = topicInfo.parts.get(record.partitionId());
+        String topicPart = topicInfo.name + "-" + record.partitionId();
         if (prevPartInfo == null) {
-            log.info("Created partition {}:{} with {}.", record.topicId(),
-                record.partitionId(), newPartInfo.toString());
+            log.info("Created partition {} with {}.", topicPart, newPartInfo.toString());

Review comment:
       Is it intentional to use "name" to replace "id"?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List<Integer> assignment,
         }
     }
 
+    void generateLeaderAndIsrUpdates(String context,
+                                     int brokerToRemoveFromIsr,
+                                     List<ApiMessageAndVersion> records,
+                                     Iterator<TopicIdPartition> iterator) {
+        int oldSize = records.size();
+        while (iterator.hasNext()) {
+            TopicIdPartition topicIdPart = iterator.next();
+            TopicControlInfo topic = topics.get(topicIdPart.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in " +
+                    "isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicIdPart +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemoveFromIsr);
+            int newLeader = Replicas.contains(newIsr, partition.leader) ? partition.leader :
+                bestLeader(partition.replicas, newIsr, false);
+            boolean unclean = newLeader != NO_LEADER && !Replicas.contains(newIsr, newLeader);

Review comment:
       Could we reuse `wasCleanlyDerivedFrom` to get "unclean"?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -907,13 +843,13 @@ ApiError electLeader(String topic, int partitionId, boolean unclean,
         }
         PartitionChangeRecord record = new PartitionChangeRecord().
             setPartitionId(partitionId).
-            setTopicId(topicId);
-        if (unclean && !Replicas.contains(partitionInfo.isr, newLeader)) {
-            // If the election was unclean, we may have to forcibly add the replica to
-            // the ISR.  This can result in data loss!
+            setTopicId(topicId).
+            setLeader(newLeader);
+        if (!Replicas.contains(partitionInfo.isr, newLeader)) {
+            // If the election was unclean, we have to forcibly set the ISR to just the
+            // new leader.  This can result in data loss!

Review comment:
       redundant "space"

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List<Integer> assignment,
         }
     }
 
+    void generateLeaderAndIsrUpdates(String context,
+                                     int brokerToRemoveFromIsr,
+                                     List<ApiMessageAndVersion> records,
+                                     Iterator<TopicIdPartition> iterator) {
+        int oldSize = records.size();
+        while (iterator.hasNext()) {
+            TopicIdPartition topicIdPart = iterator.next();
+            TopicControlInfo topic = topics.get(topicIdPart.topicId());
+            if (topic == null) {
+                throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in " +
+                    "isrMembers, but not in the topics map.");
+            }
+            PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId());
+            if (partition == null) {
+                throw new RuntimeException("Partition " + topicIdPart +
+                    " existed in isrMembers, but not in the partitions map.");
+            }
+            int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemoveFromIsr);
+            int newLeader = Replicas.contains(newIsr, partition.leader) ? partition.leader :
+                bestLeader(partition.replicas, newIsr, false);
+            boolean unclean = newLeader != NO_LEADER && !Replicas.contains(newIsr, newLeader);
+            if (unclean) {
+                // After an unclean leader election, the ISR is reset to just the new leader.
+                newIsr = new int[] {newLeader};
+            } else if (newIsr.length == 0) {
+                // We never want to shrink the ISR to size 0.

Review comment:
       The `newLeader` is `NO_LEADER` (due to empty `newIsr`) so we could set `NO_LEADER` to `record`. Does it make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org