You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/02 01:00:13 UTC

[GitHub] [kafka] cmccabe opened a new pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

cmccabe opened a new pull request #10463:
URL: https://github.com/apache/kafka/pull/10463


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612769595



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -832,13 +833,33 @@ void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) {
                 throw new RuntimeException("Partition " + topicIdPartition +
                     " existed in isrMembers, but not in the partitions map.");
             }
-            // TODO: if this partition is configured for unclean leader election,
-            // check the replica set rather than the ISR.
-            if (Replicas.contains(partition.isr, brokerId)) {
-                records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
+            boolean brokerInIsr = Replicas.contains(partition.isr, brokerId);
+            boolean shouldBecomeLeader;
+            if (configurationControl.shouldUseUncleanLeaderElection(topic.name)) {
+                shouldBecomeLeader = Replicas.contains(partition.replicas, brokerId);
+            } else {
+                shouldBecomeLeader = brokerInIsr;
+            }
+            if (shouldBecomeLeader) {
+                if (brokerInIsr) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("The newly active node {} will be the leader for the " +
+                            "previously offline partition {}.",
+                            brokerId, topicIdPartition);
+                    }
+                } else {
+                    log.info("The newly active node {} will be the leader for the " +
+                        "previously offline partition {}, after an UNCLEAN leader election.",
+                        brokerId, topicIdPartition);
+                }
+                PartitionChangeRecord record = new PartitionChangeRecord().
                     setPartitionId(topicIdPartition.partitionId()).
                     setTopicId(topic.id).
-                    setLeader(brokerId), (short) 0));
+                    setLeader(brokerId);
+                if (!brokerInIsr) {
+                    record.setIsr(Replicas.toList(partition.isr, brokerId));

Review comment:
       Hmm, if we performs an unclean leader election, the only replica in ISR should just be the new leader since the data in existing ISR is not guaranteed to match with the new leader.

##########
File path: metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
##########
@@ -245,4 +246,32 @@ public void testLegacyAlterConfigs() {
             manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))))
         );
     }
+
+    @Test
+    public void testShouldUseUncleanLeaderElection() throws Exception {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        ConfigurationControlManager manager =
+            new ConfigurationControlManager(new LogContext(), 0, snapshotRegistry, CONFIGS);
+        ControllerResult<Map<ConfigResource, ApiError>> result = manager.incrementalAlterConfigs(
+            toMap(entry(BROKER0, toMap(
+                    entry(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, entry(SET, "true")))),
+                entry(MYTOPIC, toMap(
+                    entry(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, entry(SET, "false"))))));
+        Map<ConfigResource, ApiError> expectedResponse = new HashMap<>();
+        expectedResponse.put(BROKER0, ApiError.NONE);
+        expectedResponse.put(MYTOPIC, ApiError.NONE);
+        assertEquals(expectedResponse, result.response());
+        ControllerTestUtils.replayAll(manager, result.records());
+        assertTrue(manager.shouldUseUncleanLeaderElection(MYTOPIC.name()));

Review comment:
       I think in this case, unclean leader election should be false for MYTOPIC since the topic level config takes precedence. 

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    private boolean getBoolean(ConfigResource configResource, String key) {
+        TimelineHashMap<String, String> map = configData.get(configResource);
+        if (map == null) return false;
+        String value = map.getOrDefault(key, "false");
+        return value.equalsIgnoreCase("true");
+    }
+
+    /**
+     * Check if the given topic should use an unclean leader election.
+     *
+     * @param topicName     The topic name.
+     * @return              True if the controller or topic was configured to use unclean
+     *                      leader election.
+     */
+    boolean shouldUseUncleanLeaderElection(String topicName) {
+        // Check the node config, cluster config, and topic config.
+        return getBoolean(currentNodeResource, UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) ||

Review comment:
       Hmm, the priority of a config is topic, node, node default. So, if the config is set at the topic level, we should just use the topic level setting and ignore the node level setting. If the config is not set at the topic level explicitly, then we move to the node level. If the node level is not set, then we move to the node default. 

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    private boolean getBoolean(ConfigResource configResource, String key) {
+        TimelineHashMap<String, String> map = configData.get(configResource);
+        if (map == null) return false;
+        String value = map.getOrDefault(key, "false");
+        return value.equalsIgnoreCase("true");
+    }
+
+    /**
+     * Check if the given topic should use an unclean leader election.
+     *
+     * @param topicName     The topic name.
+     * @return              True if the controller or topic was configured to use unclean
+     *                      leader election.
+     */
+    boolean shouldUseUncleanLeaderElection(String topicName) {

Review comment:
       If unclean leader election is enabled through a config change, we need to trigger a leader election on all relevant partitions without a current leader. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r613646109



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ElectionStrategizer.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+
+class ElectionStrategizer {
+    private static final Logger log = LoggerFactory.getLogger(ElectionStrategizer.class);
+
+    private final int nodeId;
+    private Boolean nodeUncleanConfig = null;
+    private Boolean clusterUncleanConfig = null;
+    private Function<String, String> topicUncleanConfigAccessor = __ -> "false";
+    private Map<String, String> topicUncleanOverrides = new HashMap<>();
+
+    ElectionStrategizer(int nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    ElectionStrategizer setNodeUncleanConfig(String nodeUncleanConfig) {
+        this.nodeUncleanConfig = parseBoolean("node", nodeUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setClusterUncleanConfig(String clusterUncleanConfig) {
+        this.clusterUncleanConfig = parseBoolean("cluster", clusterUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanConfigAccessor(
+            Function<String, String> topicUncleanConfigAccessor) {
+        this.topicUncleanConfigAccessor = topicUncleanConfigAccessor;
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanOverride(String topicName, String value) {
+        this.topicUncleanOverrides.put(topicName, value);
+        return this;
+    }
+
+    boolean shouldBeUnclean(String topicName) {
+        Boolean topicConfig = (topicUncleanOverrides.containsKey(topicName)) ?
+            parseBoolean("topic", topicUncleanOverrides.get(topicName)) :
+            parseBoolean("topic", topicUncleanConfigAccessor.apply(topicName));
+        if (topicConfig != null) return topicConfig.booleanValue();
+        if (nodeUncleanConfig != null) return nodeUncleanConfig.booleanValue();
+        if (clusterUncleanConfig != null) return clusterUncleanConfig.booleanValue();
+        return false;
+    }
+
+    // VisibleForTesting
+    Boolean parseBoolean(String what, String value) {
+        if (value == null) return null;
+        if (value.equalsIgnoreCase("true")) return true;
+        if (value.equalsIgnoreCase("false")) return false;
+        if (value.trim().isEmpty()) return null;
+        log.warn("Invalid value for {} config {} on node {}: '{}'. Expected true or false.",

Review comment:
       We are planning to do more validation on configs, but that's separate from this PR, I think.  Even after we implement more validation there will be a need to handle invalid values that exist because of upgrade / downgrade issues and so forth.  So it's good to be robust against them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r613659239



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ElectionStrategizer.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+
+class ElectionStrategizer {
+    private static final Logger log = LoggerFactory.getLogger(ElectionStrategizer.class);
+
+    private final int nodeId;
+    private Boolean nodeUncleanConfig = null;
+    private Boolean clusterUncleanConfig = null;
+    private Function<String, String> topicUncleanConfigAccessor = __ -> "false";
+    private Map<String, String> topicUncleanOverrides = new HashMap<>();
+
+    ElectionStrategizer(int nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    ElectionStrategizer setNodeUncleanConfig(String nodeUncleanConfig) {
+        this.nodeUncleanConfig = parseBoolean("node", nodeUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setClusterUncleanConfig(String clusterUncleanConfig) {
+        this.clusterUncleanConfig = parseBoolean("cluster", clusterUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanConfigAccessor(
+            Function<String, String> topicUncleanConfigAccessor) {
+        this.topicUncleanConfigAccessor = topicUncleanConfigAccessor;
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanOverride(String topicName, String value) {
+        this.topicUncleanOverrides.put(topicName, value);
+        return this;
+    }
+
+    boolean shouldBeUnclean(String topicName) {
+        Boolean topicConfig = (topicUncleanOverrides.containsKey(topicName)) ?
+            parseBoolean("topic", topicUncleanOverrides.get(topicName)) :
+            parseBoolean("topic", topicUncleanConfigAccessor.apply(topicName));
+        if (topicConfig != null) return topicConfig.booleanValue();
+        if (nodeUncleanConfig != null) return nodeUncleanConfig.booleanValue();
+        if (clusterUncleanConfig != null) return clusterUncleanConfig.booleanValue();
+        return false;
+    }
+
+    // VisibleForTesting
+    Boolean parseBoolean(String what, String value) {

Review comment:
       Fair enough.  An `Optional<Boolean>` might be nicer than using `null`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#issuecomment-909609953


   @dielhennr : Thanks. Since trunk has changed quite a bit, it would be useful to pull all the changes into this PR and rebase before reviewing it again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r614404680



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1148,113 @@ void validateManualPartitionAssignment(List<Integer> assignment,
         }
     }
 
+    /**
+     * Handle legacy configuration alterations.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+            Map<ConfigResource, Map<String, String>> newConfigs) {
+        ControllerResult<Map<ConfigResource, ApiError>> result =
+            configurationControl.legacyAlterConfigs(newConfigs);
+        return alterConfigs(result);
+    }
+
+    /**
+     * Handle incremental configuration alterations.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+            Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges) {
+        ControllerResult<Map<ConfigResource, ApiError>> result =
+            configurationControl.incrementalAlterConfigs(configChanges);
+        return alterConfigs(result);
+    }
+
+    /**
+     * If important controller configurations were changed, generate records which will
+     * apply the changes.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> alterConfigs(
+            ControllerResult<Map<ConfigResource, ApiError>> result) {
+        ElectionStrategizer strategizer = examineConfigAlterations(result.records());
+        boolean isAtomic = true;
+        List<ApiMessageAndVersion> records = result.records();
+        if (strategizer != null) {
+            records.addAll(handleLeaderElectionConfigChanges(strategizer));
+            isAtomic = false;

Review comment:
       I changed the approach here in the latest PR.  Now, rather than combining the records into a single list, I schedule another task to scan through the offline partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612795358



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -832,13 +833,33 @@ void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) {
                 throw new RuntimeException("Partition " + topicIdPartition +
                     " existed in isrMembers, but not in the partitions map.");
             }
-            // TODO: if this partition is configured for unclean leader election,
-            // check the replica set rather than the ISR.
-            if (Replicas.contains(partition.isr, brokerId)) {
-                records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
+            boolean brokerInIsr = Replicas.contains(partition.isr, brokerId);
+            boolean shouldBecomeLeader;
+            if (configurationControl.shouldUseUncleanLeaderElection(topic.name)) {
+                shouldBecomeLeader = Replicas.contains(partition.replicas, brokerId);
+            } else {
+                shouldBecomeLeader = brokerInIsr;
+            }
+            if (shouldBecomeLeader) {
+                if (brokerInIsr) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("The newly active node {} will be the leader for the " +
+                            "previously offline partition {}.",
+                            brokerId, topicIdPartition);
+                    }
+                } else {
+                    log.info("The newly active node {} will be the leader for the " +
+                        "previously offline partition {}, after an UNCLEAN leader election.",
+                        brokerId, topicIdPartition);
+                }
+                PartitionChangeRecord record = new PartitionChangeRecord().
                     setPartitionId(topicIdPartition.partitionId()).
                     setTopicId(topic.id).
-                    setLeader(brokerId), (short) 0));
+                    setLeader(brokerId);
+                if (!brokerInIsr) {
+                    record.setIsr(Replicas.toList(partition.isr, brokerId));

Review comment:
       Good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r672693205



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -42,23 +42,38 @@
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Function;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
 
 
 public class ConfigurationControlManager {
+    static final ConfigResource DEFAULT_NODE_RESOURCE = new ConfigResource(Type.BROKER, "");
+
     private final Logger log;
+    private final int nodeId;
+    private final ConfigResource currentNodeResource;
     private final SnapshotRegistry snapshotRegistry;
     private final Map<ConfigResource.Type, ConfigDef> configDefs;
+    private final TimelineHashMap<String, String> emptyMap;

Review comment:
       I don't think it can be. It needs to be a TimelineHashMap to work and needs to receive the snapshot registry in the constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#issuecomment-906572591


   Thanks @dielhennr . Could you rebase this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612786365



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    private boolean getBoolean(ConfigResource configResource, String key) {
+        TimelineHashMap<String, String> map = configData.get(configResource);
+        if (map == null) return false;
+        String value = map.getOrDefault(key, "false");
+        return value.equalsIgnoreCase("true");
+    }
+
+    /**
+     * Check if the given topic should use an unclean leader election.
+     *
+     * @param topicName     The topic name.
+     * @return              True if the controller or topic was configured to use unclean
+     *                      leader election.
+     */
+    boolean shouldUseUncleanLeaderElection(String topicName) {
+        // Check the node config, cluster config, and topic config.
+        return getBoolean(currentNodeResource, UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) ||

Review comment:
       Hmm.  So an individual topic could configure unclean leader election as false even if the node sets it to true?  I will implement that, then...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#issuecomment-819922989


   @junrao : I filed https://issues.apache.org/jira/browse/KAFKA-12670 for this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612792056



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    private boolean getBoolean(ConfigResource configResource, String key) {
+        TimelineHashMap<String, String> map = configData.get(configResource);
+        if (map == null) return false;
+        String value = map.getOrDefault(key, "false");
+        return value.equalsIgnoreCase("true");
+    }
+
+    /**
+     * Check if the given topic should use an unclean leader election.
+     *
+     * @param topicName     The topic name.
+     * @return              True if the controller or topic was configured to use unclean
+     *                      leader election.
+     */
+    boolean shouldUseUncleanLeaderElection(String topicName) {
+        // Check the node config, cluster config, and topic config.
+        return getBoolean(currentNodeResource, UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) ||

Review comment:
       Right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
dielhennr commented on pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#issuecomment-889296194


   I stacked https://github.com/cmccabe/kafka/pull/6 on this PR to address some of Jun's comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#issuecomment-909609953


   @dielhennr : Thanks. Since trunk has changed quite a bit, it would be useful to pull all the changes into this PR and rebase before reviewing it again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r673531081



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +389,20 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    ConfigResource currentNodeResource() {

Review comment:
       No, this method is used to access the private member `currentNodeResource` from outside of the class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r613366570



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ElectionStrategizer.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+
+class ElectionStrategizer {

Review comment:
       Could we add a comment on what the class does?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ElectionStrategizer.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+
+class ElectionStrategizer {
+    private static final Logger log = LoggerFactory.getLogger(ElectionStrategizer.class);
+
+    private final int nodeId;
+    private Boolean nodeUncleanConfig = null;
+    private Boolean clusterUncleanConfig = null;
+    private Function<String, String> topicUncleanConfigAccessor = __ -> "false";
+    private Map<String, String> topicUncleanOverrides = new HashMap<>();
+
+    ElectionStrategizer(int nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    ElectionStrategizer setNodeUncleanConfig(String nodeUncleanConfig) {
+        this.nodeUncleanConfig = parseBoolean("node", nodeUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setClusterUncleanConfig(String clusterUncleanConfig) {
+        this.clusterUncleanConfig = parseBoolean("cluster", clusterUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanConfigAccessor(
+            Function<String, String> topicUncleanConfigAccessor) {
+        this.topicUncleanConfigAccessor = topicUncleanConfigAccessor;
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanOverride(String topicName, String value) {
+        this.topicUncleanOverrides.put(topicName, value);
+        return this;
+    }
+
+    boolean shouldBeUnclean(String topicName) {
+        Boolean topicConfig = (topicUncleanOverrides.containsKey(topicName)) ?
+            parseBoolean("topic", topicUncleanOverrides.get(topicName)) :
+            parseBoolean("topic", topicUncleanConfigAccessor.apply(topicName));
+        if (topicConfig != null) return topicConfig.booleanValue();
+        if (nodeUncleanConfig != null) return nodeUncleanConfig.booleanValue();
+        if (clusterUncleanConfig != null) return clusterUncleanConfig.booleanValue();
+        return false;
+    }
+
+    // VisibleForTesting
+    Boolean parseBoolean(String what, String value) {

Review comment:
       Would it be better to return Optional to handle null more explicitly?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -783,7 +788,8 @@ void handleNodeDeactivated(int brokerId, List<ApiMessageAndVersion> records) {
                 record.setIsr(Replicas.toList(newIsr));
                 if (partition.leader == brokerId) {
                     // The fenced node will no longer be the leader.
-                    int newLeader = bestLeader(partition.replicas, newIsr, false);
+                    int newLeader = bestLeader(partition.replicas, newIsr,

Review comment:
       Hmm, how do we prevent the deacitved broker from being selected as the new broker? It seems that at this point, clusterControl hasn't reflected the fenceBrokerRecord yet.
   
   Also, in the caller handleBrokerFenced(), we add the partitionChangeRecord before the fenceBrokerRecord. Ordering wise, it seems that it's more natural to add the fenceBrokerRecord first. Then, it's clear that the partitionChangeRecord is the result of the fenceBrokerRecord.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ElectionStrategizer.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+
+
+class ElectionStrategizer {
+    private static final Logger log = LoggerFactory.getLogger(ElectionStrategizer.class);
+
+    private final int nodeId;
+    private Boolean nodeUncleanConfig = null;
+    private Boolean clusterUncleanConfig = null;
+    private Function<String, String> topicUncleanConfigAccessor = __ -> "false";
+    private Map<String, String> topicUncleanOverrides = new HashMap<>();
+
+    ElectionStrategizer(int nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    ElectionStrategizer setNodeUncleanConfig(String nodeUncleanConfig) {
+        this.nodeUncleanConfig = parseBoolean("node", nodeUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setClusterUncleanConfig(String clusterUncleanConfig) {
+        this.clusterUncleanConfig = parseBoolean("cluster", clusterUncleanConfig);
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanConfigAccessor(
+            Function<String, String> topicUncleanConfigAccessor) {
+        this.topicUncleanConfigAccessor = topicUncleanConfigAccessor;
+        return this;
+    }
+
+    ElectionStrategizer setTopicUncleanOverride(String topicName, String value) {
+        this.topicUncleanOverrides.put(topicName, value);
+        return this;
+    }
+
+    boolean shouldBeUnclean(String topicName) {
+        Boolean topicConfig = (topicUncleanOverrides.containsKey(topicName)) ?
+            parseBoolean("topic", topicUncleanOverrides.get(topicName)) :
+            parseBoolean("topic", topicUncleanConfigAccessor.apply(topicName));
+        if (topicConfig != null) return topicConfig.booleanValue();
+        if (nodeUncleanConfig != null) return nodeUncleanConfig.booleanValue();
+        if (clusterUncleanConfig != null) return clusterUncleanConfig.booleanValue();
+        return false;
+    }
+
+    // VisibleForTesting
+    Boolean parseBoolean(String what, String value) {
+        if (value == null) return null;
+        if (value.equalsIgnoreCase("true")) return true;
+        if (value.equalsIgnoreCase("false")) return false;
+        if (value.trim().isEmpty()) return null;
+        log.warn("Invalid value for {} config {} on node {}: '{}'. Expected true or false.",

Review comment:
       If the boolean is set to an invalid value, it seems that we should send an error to the caller?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/Replicas.java
##########
@@ -43,6 +43,18 @@
         return list;
     }
 
+    /**
+     * Convert an array of integers to a list of ints and append a final element.
+     *
+     * @param array         The input array.
+     * @return              The output list.
+     */
+    public static List<Integer> toList(int[] array, int last) {

Review comment:
       It seems that this method is now only used in tests and can be removed?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -832,13 +839,35 @@ void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) {
                 throw new RuntimeException("Partition " + topicIdPartition +
                     " existed in isrMembers, but not in the partitions map.");
             }
-            // TODO: if this partition is configured for unclean leader election,
-            // check the replica set rather than the ISR.
-            if (Replicas.contains(partition.isr, brokerId)) {
-                records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
-                    setPartitionId(topicIdPartition.partitionId()).
-                    setTopicId(topic.id).
-                    setLeader(brokerId), (short) 0));
+            if (strategizer.shouldBeUnclean(topic.name)) {
+                if (Replicas.contains(partition.replicas, brokerId)) {
+                    // Perform an unclean leader election.  The only entry in the new ISR
+                    // will be the current broker, since the data in the existing ISR is
+                    // not guaranteed to match with the new leader.
+                    log.info("The newly active node {} will be the leader for the " +
+                            "previously offline partition {}, after an UNCLEAN leader election.",
+                        brokerId, topicIdPartition);
+                    PartitionChangeRecord record = new PartitionChangeRecord().
+                        setPartitionId(topicIdPartition.partitionId()).
+                        setTopicId(topic.id).
+                        setLeader(brokerId).
+                        setIsr(Collections.singletonList(brokerId));
+                    records.add(new ApiMessageAndVersion(record, (short) 0));
+                }
+            } else {
+                if (Replicas.contains(partition.isr, brokerId)) {
+                    // Perform a clean leader election.
+                    if (log.isDebugEnabled()) {
+                        log.debug("The newly active node {} will be the leader for the " +
+                                "previously offline partition {}.",
+                            brokerId, topicIdPartition);
+                    }
+                    PartitionChangeRecord record = new PartitionChangeRecord().
+                        setPartitionId(topicIdPartition.partitionId()).
+                        setTopicId(topic.id).
+                        setLeader(brokerId);

Review comment:
       Should we set the ISR for this partition too?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -783,7 +788,8 @@ void handleNodeDeactivated(int brokerId, List<ApiMessageAndVersion> records) {
                 record.setIsr(Replicas.toList(newIsr));
                 if (partition.leader == brokerId) {
                     // The fenced node will no longer be the leader.
-                    int newLeader = bestLeader(partition.replicas, newIsr, false);
+                    int newLeader = bestLeader(partition.replicas, newIsr,
+                        strategizer.shouldBeUnclean(topic.name));

Review comment:
       If there is an unclean leader election, we need to reset the ISR to just the leader. Since this needs to be done in multiple places, perhaps we could change bestLeader() to return both the new leader and the isr.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +389,20 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    ConfigResource currentNodeResource() {

Review comment:
       Could this be private?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -832,13 +839,35 @@ void handleNodeActivated(int brokerId, List<ApiMessageAndVersion> records) {
                 throw new RuntimeException("Partition " + topicIdPartition +
                     " existed in isrMembers, but not in the partitions map.");
             }
-            // TODO: if this partition is configured for unclean leader election,
-            // check the replica set rather than the ISR.
-            if (Replicas.contains(partition.isr, brokerId)) {
-                records.add(new ApiMessageAndVersion(new PartitionChangeRecord().
-                    setPartitionId(topicIdPartition.partitionId()).
-                    setTopicId(topic.id).
-                    setLeader(brokerId), (short) 0));
+            if (strategizer.shouldBeUnclean(topic.name)) {
+                if (Replicas.contains(partition.replicas, brokerId)) {
+                    // Perform an unclean leader election.  The only entry in the new ISR
+                    // will be the current broker, since the data in the existing ISR is
+                    // not guaranteed to match with the new leader.
+                    log.info("The newly active node {} will be the leader for the " +
+                            "previously offline partition {}, after an UNCLEAN leader election.",
+                        brokerId, topicIdPartition);
+                    PartitionChangeRecord record = new PartitionChangeRecord().
+                        setPartitionId(topicIdPartition.partitionId()).
+                        setTopicId(topic.id).
+                        setLeader(brokerId).
+                        setIsr(Collections.singletonList(brokerId));
+                    records.add(new ApiMessageAndVersion(record, (short) 0));
+                }
+            } else {
+                if (Replicas.contains(partition.isr, brokerId)) {

Review comment:
       We should prefer the clean leader election even if shouldBeUnclean is true.

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -42,23 +42,38 @@
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Function;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
 
 
 public class ConfigurationControlManager {
+    static final ConfigResource DEFAULT_NODE_RESOURCE = new ConfigResource(Type.BROKER, "");
+
     private final Logger log;
+    private final int nodeId;
+    private final ConfigResource currentNodeResource;
     private final SnapshotRegistry snapshotRegistry;
     private final Map<ConfigResource.Type, ConfigDef> configDefs;
+    private final TimelineHashMap<String, String> emptyMap;

Review comment:
       Could this be a static val?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1119,6 +1148,113 @@ void validateManualPartitionAssignment(List<Integer> assignment,
         }
     }
 
+    /**
+     * Handle legacy configuration alterations.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+            Map<ConfigResource, Map<String, String>> newConfigs) {
+        ControllerResult<Map<ConfigResource, ApiError>> result =
+            configurationControl.legacyAlterConfigs(newConfigs);
+        return alterConfigs(result);
+    }
+
+    /**
+     * Handle incremental configuration alterations.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+            Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges) {
+        ControllerResult<Map<ConfigResource, ApiError>> result =
+            configurationControl.incrementalAlterConfigs(configChanges);
+        return alterConfigs(result);
+    }
+
+    /**
+     * If important controller configurations were changed, generate records which will
+     * apply the changes.
+     */
+    ControllerResult<Map<ConfigResource, ApiError>> alterConfigs(
+            ControllerResult<Map<ConfigResource, ApiError>> result) {
+        ElectionStrategizer strategizer = examineConfigAlterations(result.records());
+        boolean isAtomic = true;
+        List<ApiMessageAndVersion> records = result.records();
+        if (strategizer != null) {
+            records.addAll(handleLeaderElectionConfigChanges(strategizer));
+            isAtomic = false;

Review comment:
       The partitionRecord from unclean leader election doesn't need to be applied atomically. However, it seems the config records still need to be applied atomically?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
dielhennr commented on pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#issuecomment-906901483


   @junrao I don't have write access to this fork which is why I stacked my PR onto this one instead of committing. My PR has no conflicts with this branch so it should be mergeable. I think @cmccabe will have to rebase this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r672693205



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -42,23 +42,38 @@
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Function;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
 
 
 public class ConfigurationControlManager {
+    static final ConfigResource DEFAULT_NODE_RESOURCE = new ConfigResource(Type.BROKER, "");
+
     private final Logger log;
+    private final int nodeId;
+    private final ConfigResource currentNodeResource;
     private final SnapshotRegistry snapshotRegistry;
     private final Map<ConfigResource.Type, ConfigDef> configDefs;
+    private final TimelineHashMap<String, String> emptyMap;

Review comment:
       I don't think it can be. It needs to be a TimelineHashMap to work and needs to receive the snapshot registry in the constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r672693205



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -42,23 +42,38 @@
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.Function;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
+import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
 
 
 public class ConfigurationControlManager {
+    static final ConfigResource DEFAULT_NODE_RESOURCE = new ConfigResource(Type.BROKER, "");
+
     private final Logger log;
+    private final int nodeId;
+    private final ConfigResource currentNodeResource;
     private final SnapshotRegistry snapshotRegistry;
     private final Map<ConfigResource.Type, ConfigDef> configDefs;
+    private final TimelineHashMap<String, String> emptyMap;

Review comment:
       I don't think it can be. It needs to be a TimelineHashMap to work and needs to receive the snapshot registry in the constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #10463: KAFKA-12670: support unclean.leader.election.enable in KRaft mode

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r673531081



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +389,20 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    ConfigResource currentNodeResource() {

Review comment:
       No, this method is used to access the private variable `currentNodeResource` from outside of the class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612969287



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    private boolean getBoolean(ConfigResource configResource, String key) {
+        TimelineHashMap<String, String> map = configData.get(configResource);
+        if (map == null) return false;
+        String value = map.getOrDefault(key, "false");
+        return value.equalsIgnoreCase("true");
+    }
+
+    /**
+     * Check if the given topic should use an unclean leader election.
+     *
+     * @param topicName     The topic name.
+     * @return              True if the controller or topic was configured to use unclean
+     *                      leader election.
+     */
+    boolean shouldUseUncleanLeaderElection(String topicName) {

Review comment:
       Thanks for finding this!  I implemented leader election triggered by setting the unclean leader election config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10463: MINOR: KRaft support for unclean.leader.election.enable

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10463:
URL: https://github.com/apache/kafka/pull/10463#discussion_r612786365



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
##########
@@ -374,6 +381,27 @@ void deleteTopicConfigs(String name) {
         configData.remove(new ConfigResource(Type.TOPIC, name));
     }
 
+    private boolean getBoolean(ConfigResource configResource, String key) {
+        TimelineHashMap<String, String> map = configData.get(configResource);
+        if (map == null) return false;
+        String value = map.getOrDefault(key, "false");
+        return value.equalsIgnoreCase("true");
+    }
+
+    /**
+     * Check if the given topic should use an unclean leader election.
+     *
+     * @param topicName     The topic name.
+     * @return              True if the controller or topic was configured to use unclean
+     *                      leader election.
+     */
+    boolean shouldUseUncleanLeaderElection(String topicName) {
+        // Check the node config, cluster config, and topic config.
+        return getBoolean(currentNodeResource, UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG) ||

Review comment:
       So an individual topic could configure unclean leader election as false even if the node sets it to true?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org