You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/07 13:25:47 UTC

[GitHub] [kafka] showuon commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

showuon commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r684631273



##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 

Review comment:
       I think this comment doesn't explain what we're testing in this test. Maybe update to: "... to check that if processRoles property is set, the `controller.quorum.voters` should not be empty"

##########
File path: core/src/test/scala/unit/kafka/KafkaConfigTest.scala
##########
@@ -79,6 +81,71 @@ class KafkaTest {
     assertThrows(classOf[FatalExitError], () => KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))))
   }
 
+  @Test
+  def testBrokerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testControllerRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testColocatedRoleNodeIdValidation(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+    // Ensure that with a valid config no exception is thrown
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+    KafkaConfig.fromProps(propertiesFile)
+  }
+
+  @Test
+  def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+    // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters 
+    val propertiesFile = new Properties
+    propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+    propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+    propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+    setListenerProps(propertiesFile)
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+  }

Review comment:
       nit: Maybe we can test after this `assertThrows` that when we remove the `ProcessRolesProp`, no exceptions will be thrown. ex:
   ```java
   assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
   
       // Ensure that if removing the ProcessRolesProp value, no exception is thrown
       propertiesFile.setProperty((KafkaConfig.ProcessRolesProp, "")
       KafkaConfig.fromProps(propertiesFile)
   ```
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org