You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/05 03:06:39 UTC

[GitHub] [kafka] dielhennr opened a new pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

dielhennr opened a new pull request #11179:
URL: https://github.com/apache/kafka/pull/11179


   Under certain configuration is possible for the Kafka Server to boot up as a broker only but be the cluster metadata quorum leader. We should validate the configuration to avoid this case.
   
   https://issues.apache.org/jira/browse/KAFKA-13165
   
   Tested manually by starting up a broker and a controller both with valid/invalid configurations 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683690243



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (voterIds.nonEmpty) {
+      if (processRoles.contains(ControllerRole)) {
+        // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+        require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      } else {
+        // Ensure that the broker's node.id is not an id in controller.quorum.voters
+        require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      }
+    }
+

Review comment:
       Good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683110083



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (processRoles.contains(ControllerRole)) {
+      // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+      require(voterIds.contains(nodeId), s"The controller must contain a voter for it's ${KafkaConfig.NodeIdProp}=$nodeId in ${RaftConfig.QUORUM_VOTERS_CONFIG}.")
+    } else {
+      // Ensure that the broker's node.id is not an id in controller.quorum.voters
+      require(!voterIds.contains(nodeId), s"Since ${KafkaConfig.ProcessRolesProp}=broker, the the broker's ${KafkaConfig.NodeIdProp}=nodeId should not be in ${RaftConfig.QUORUM_VOTERS_CONFIG}")

Review comment:
       How about "If ${....process role config...} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${QUORUM_VOTERS_CONFIG}=$votersIds"?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (processRoles.contains(ControllerRole)) {
+      // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+      require(voterIds.contains(nodeId), s"The controller must contain a voter for it's ${KafkaConfig.NodeIdProp}=$nodeId in ${RaftConfig.QUORUM_VOTERS_CONFIG}.")

Review comment:
       How about "If ${....process role config...} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QUORUM_VOTERS_CONFIG}=$votersIds"? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684692820



##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+  }

Review comment:
       ```
       // Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined
        propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "")
        assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
   
        // Ensure that no exception is thrown once zookeeper.connect is defined
        propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
        KafkaConfig.fromProps(propertiesFile)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683880423



##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {

Review comment:
       I'll switch it to use `tempDir` shortly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683654444



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (voterIds.nonEmpty) {
+      if (processRoles.contains(ControllerRole)) {
+        // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+        require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      } else {
+        // Ensure that the broker's node.id is not an id in controller.quorum.voters
+        require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      }
+    }
+

Review comment:
       Right, but if the voters list _is_ empty, and we are in self-managed mode, I believe we should throw an error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683552057



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (voterIds.nonEmpty) {
+      if (processRoles.contains(ControllerRole)) {
+        // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+        require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      } else {
+        // Ensure that the broker's node.id is not an id in controller.quorum.voters
+        require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      }
+    }
+

Review comment:
       I think we need an additional check that the voter list is not empty if `process.roles` is set. Otherwise we will skip the validation that `node.id` is included in the voter list when we are a controller.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683859800



##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -262,6 +262,8 @@ class KafkaConfigTest {
     props.put(KafkaConfig.ProcessRolesProp, "controller")
     props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
     props.put(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, s"1@localhost:9092")

Review comment:
       This is regular string. String interpolation `s"..."` is not needed. This comment applies to a few places.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (voterIds.nonEmpty) {
+      if (processRoles.contains(ControllerRole)) {
+        // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+        require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      } else {
+        // Ensure that the broker's node.id is not an id in controller.quorum.voters
+        require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      }
+    } else if (usesSelfManagedQuorum) {
+      throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${RaftConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.")
+    }

Review comment:
       ```scala
       if (usesSelfManagedQuorum) {
         val voterIds = RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet
         if (voterIds.isEmpty) {
           throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${RaftConfig.QUORUM_VOTERS_CONFIG} must contain a parseable set of voters.")
         } else if (processRoles.contains(ControllerRole)) {
           // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
           require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
         } else {
           // Ensure that the broker's node.id is not an id in controller.quorum.voters
           require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
         }
       }
   ```

##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
     def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
       val props = new Properties
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val nodeIdMod = (nodeId.toInt + 1)
+          props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, s"${nodeIdMod.toString}@localhost:9093")

Review comment:
       String interpolation should call `toString` so you don't need to call it.

##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {

Review comment:
       The issue is that after every test you need to delete the directory created. This is where raft creates the directory
   https://github.com/apache/kafka/blob/9bc45d4e031bef63772b3d8d7c5828c0733a27b0/core/src/main/scala/kafka/raft/RaftManager.scala#L212-L215
   
   So you need to override that with a temp dir from https://github.com/apache/kafka/blob/9bc45d4e031bef63772b3d8d7c5828c0733a27b0/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L117
   and delete it after every test.

##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
     def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
       val props = new Properties
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val nodeIdMod = (nodeId.toInt + 1)
+          props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, s"${nodeIdMod.toString}@localhost:9093")
+        }
+      } 
+
+      if (processRoles.contains("controller")) {
+        props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, s"${nodeId}@localhost:9093")
+      }
+
       new KafkaConfig(props)
     }
 
-    val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+    val config = configWithProcessRolesAndNodeId(processRoles, nodeId) 

Review comment:
       Extra space at the end.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (voterIds.nonEmpty) {
+      if (processRoles.contains(ControllerRole)) {
+        // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+        require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")

Review comment:
       In the interest of making it consistent with all the `KafkaConfig` properties shouldn't we have an variable for `RaftConfig.QUORUM_VOTERS_CONFIG`? Maybe `KafkaConfig.QuorumVotersProp`. This comment applies to a few places in this change.

##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
     def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
       val props = new Properties
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val nodeIdMod = (nodeId.toInt + 1)

Review comment:
       Looks like this is the voter id so `val voterId = ...`

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -132,6 +133,8 @@ class KafkaApisTest {
       val properties = TestUtils.createBrokerConfig(brokerId, "")
       properties.put(KafkaConfig.NodeIdProp, brokerId.toString)
       properties.put(KafkaConfig.ProcessRolesProp, "broker")
+      val nodeIdMod = (brokerId + 1)
+      properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, s"${nodeIdMod.toString}@localhost:9093")

Review comment:
       String interpolation should call `toString`. This comment applies to a few places.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683894514



##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -262,6 +262,8 @@ class KafkaConfigTest {
     props.put(KafkaConfig.ProcessRolesProp, "controller")
     props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
     props.put(KafkaConfig.NodeIdProp, "1")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")

Review comment:
       `props.put`

##########
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##########
@@ -96,6 +96,7 @@ class ControllerApisTest {
     props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
     props.put(KafkaConfig.ProcessRolesProp, "controller")
     props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+    props.put(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")

Review comment:
       remove brackets from `nodeId` string interpolation

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -132,6 +132,8 @@ class KafkaApisTest {
       val properties = TestUtils.createBrokerConfig(brokerId, "")
       properties.put(KafkaConfig.NodeIdProp, brokerId.toString)
       properties.put(KafkaConfig.ProcessRolesProp, "broker")
+      val voterId = (brokerId + 1)
+      properties.setProperty(KafkaConfig.QuorumVotersProp, s"$voterId@localhost:9093")

Review comment:
       `props.put`

##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -1091,6 +1093,7 @@ class KafkaConfigTest {
     val largeBrokerId = 2000
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")

Review comment:
       Use `props.put` to stay consistent with method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683626287



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (voterIds.nonEmpty) {
+      if (processRoles.contains(ControllerRole)) {
+        // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+        require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      } else {
+        // Ensure that the broker's node.id is not an id in controller.quorum.voters
+        require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${RaftConfig.QUORUM_VOTERS_CONFIG}=$voterIds")
+      }
+    }
+

Review comment:
       This code already checks for that on line 1955.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684660475



##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+  }

Review comment:
       I think this is a good idea, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683110083



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (processRoles.contains(ControllerRole)) {
+      // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+      require(voterIds.contains(nodeId), s"The controller must contain a voter for it's ${KafkaConfig.NodeIdProp}=$nodeId in ${RaftConfig.QUORUM_VOTERS_CONFIG}.")
+    } else {
+      // Ensure that the broker's node.id is not an id in controller.quorum.voters
+      require(!voterIds.contains(nodeId), s"Since ${KafkaConfig.ProcessRolesProp}=broker, the the broker's ${KafkaConfig.NodeIdProp}=nodeId should not be in ${RaftConfig.QUORUM_VOTERS_CONFIG}")

Review comment:
       How about "If ${....process role config...} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${QUORUM_VOTERS_CONFIG}=$votersIds"?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1949,6 +1951,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       )
     }
 
+    val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty
+    if (processRoles.contains(ControllerRole)) {
+      // Ensure that controllers use their node.id as a voter in controller.quorum.voters 
+      require(voterIds.contains(nodeId), s"The controller must contain a voter for it's ${KafkaConfig.NodeIdProp}=$nodeId in ${RaftConfig.QUORUM_VOTERS_CONFIG}.")

Review comment:
       How about "If ${....process role config...} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QUORUM_VOTERS_CONFIG}=$votersIds"? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683960767



##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -214,6 +220,12 @@ class KafkaRaftManager[T](
     KafkaRaftManager.createLogDirectory(new File(config.metadataLogDir), logDirName)
   }
 
+  // visible for testing cleanup
+  private[raft] def deleteDataDir(): Unit = {
+    val logDirName = Log.logDirName(topicPartition)
+    KafkaRaftManager.deleteLogDirectory(new File(config.metadataLogDir), logDirName)

Review comment:
       @jsancio What do you think about exposing this api to the package for cleaning up log directories in tests? I'm not sure exactly how to override `createDataDir` with `tempDir` from the tests. It is called internally in `KafkaRaftManager` without an argument.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r685542380



##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -28,27 +28,41 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
+import java.io.File
+
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
-    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
+    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, logDir: File): KafkaConfig = {
       val props = new Properties
+      props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val voterId = (nodeId.toInt + 1)
+          props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
+        }
+      } 
+
+      if (processRoles.contains("controller")) {
+        props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+      }
+
       new KafkaConfig(props)
     }
 
-    val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+    val logDir = TestUtils.tempDirectory()

Review comment:
       thanks guys




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r685559973



##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -28,27 +28,41 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
+import java.io.File
+
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
-    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
+    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, logDir: File): KafkaConfig = {
       val props = new Properties
+      props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val voterId = (nodeId.toInt + 1)
+          props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
+        }
+      } 
+
+      if (processRoles.contains("controller")) {
+        props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+      }
+
       new KafkaConfig(props)
     }
 
-    val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+    val logDir = TestUtils.tempDirectory()

Review comment:
       https://github.com/apache/kafka/pull/11193




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r685275346



##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -79,6 +80,11 @@ object KafkaRaftManager {
     Files.createDirectories(dir.toPath)
     dir
   }
+
+  private def deleteLogDirectory(logDir: File, logDirName: String): Unit = {
+    val dir = new Directory(new File(logDir.getAbsolutePath, logDirName))
+    dir.deleteRecursively()
+  }

Review comment:
       This code should not be in `core/src/main` since it is not used by `core/src/main`.

##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -214,6 +220,12 @@ class KafkaRaftManager[T](
     KafkaRaftManager.createLogDirectory(new File(config.metadataLogDir), logDirName)
   }
 
+  // visible for testing cleanup
+  private[raft] def deleteDataDir(): Unit = {
+    val logDirName = Log.logDirName(topicPartition)
+    KafkaRaftManager.deleteLogDirectory(new File(config.metadataLogDir), logDirName)

Review comment:
       This code should not be in `core/src/main` since it is not used by `core/src/main`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r685414248



##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -28,27 +28,41 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
+import java.io.File
+
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
-    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
+    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, logDir: File): KafkaConfig = {
       val props = new Properties
+      props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val voterId = (nodeId.toInt + 1)
+          props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
+        }
+      } 
+
+      if (processRoles.contains("controller")) {
+        props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+      }
+
       new KafkaConfig(props)
     }
 
-    val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+    val logDir = TestUtils.tempDirectory()

Review comment:
       Yea probably a good idea. Looks like other tests using tempDirectory are cleaning up with `o.a.k.common.utils.Utils.delete`

##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -28,27 +28,41 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
+import java.io.File
+
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
-    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
+    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, logDir: File): KafkaConfig = {
       val props = new Properties
+      props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val voterId = (nodeId.toInt + 1)
+          props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
+        }
+      } 
+
+      if (processRoles.contains("controller")) {
+        props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+      }
+
       new KafkaConfig(props)
     }
 
-    val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+    val logDir = TestUtils.tempDirectory()

Review comment:
       Yea probably a good idea. Looks like other tests using tempDirectory are cleaning up with `o.a.k.common.utils.Utils#delete`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r685410401



##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -28,27 +28,41 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
+import java.io.File
+
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
-    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
+    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, logDir: File): KafkaConfig = {
       val props = new Properties
+      props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val voterId = (nodeId.toInt + 1)
+          props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
+        }
+      } 
+
+      if (processRoles.contains("controller")) {
+        props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+      }
+
       new KafkaConfig(props)
     }
 
-    val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+    val logDir = TestUtils.tempDirectory()

Review comment:
       Should we remove these directories after every test? I think this doesn't delete the directory until all of the tests in `core` have finished and the JVM terminates.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #11179:
URL: https://github.com/apache/kafka/pull/11179


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684430743



##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {

Review comment:
       Would this class need to extend KafkaRaftManager to override the directory field in that class with tempDir?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r685480957



##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -28,27 +28,41 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
+import java.io.File
+
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
-    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
+    def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, logDir: File): KafkaConfig = {
       val props = new Properties
+      props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
       props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
       props.setProperty(KafkaConfig.NodeIdProp, nodeId)
       props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
       props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
-      props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
-      if (processRoles.contains("broker"))
+      if (processRoles.contains("broker")) {
         props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
         props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+        if (!processRoles.contains("controller")) {
+          val voterId = (nodeId.toInt + 1)
+          props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
+        }
+      } 
+
+      if (processRoles.contains("controller")) {
+        props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+      }
+
       new KafkaConfig(props)
     }
 
-    val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+    val logDir = TestUtils.tempDirectory()

Review comment:
       @jsancio , @mumrah : Can you file a follow-up JIRA for this? We can also include the other raft integration / unit tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684429919



##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -214,6 +220,12 @@ class KafkaRaftManager[T](
     KafkaRaftManager.createLogDirectory(new File(config.metadataLogDir), logDirName)
   }
 
+  // visible for testing cleanup
+  private[raft] def deleteDataDir(): Unit = {
+    val logDirName = Log.logDirName(topicPartition)
+    KafkaRaftManager.deleteLogDirectory(new File(config.metadataLogDir), logDirName)

Review comment:
       Would this class need to extend KafkaRaftManager to override the directory field in that class with tempDir?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684631273



##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 

Review comment:
       I think this comment doesn't explain what we're testing in this test. Maybe update to: "... to check that if processRoles property is set, the `controller.quorum.voters` should not be empty"

##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+  }

Review comment:
       nit: Maybe we can test after this `assertThrows` that when we remove the `ProcessRolesProp`, no exceptions will be thrown. ex:
   ```java
   assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
   
       // Ensure that if removing the ProcessRolesProp value, no exception is thrown
       propertiesFile.setProperty((KafkaConfig.ProcessRolesProp, "")
       KafkaConfig.fromProps(propertiesFile)
   ```
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684692820



##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+  }

Review comment:
       ```
        // Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined
        propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "")
        assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
   
        // Ensure that no exception is thrown once zookeeper.connect is defined
        propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
        KafkaConfig.fromProps(propertiesFile)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dielhennr commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

Posted by GitBox <gi...@apache.org>.
dielhennr commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683879932



##########
File path: core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
##########
@@ -34,21 +34,30 @@ import org.mockito.Mockito._
 
 class RaftManagerTest {
 
-  private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
+  private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {

Review comment:
       I did this using 
   
   `scala.reflect.io.Directory.deleteRecursively`
   
   as you were reviewing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org