You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mumrah (via GitHub)" <gi...@apache.org> on 2023/05/24 19:12:23 UTC

[GitHub] [kafka] mumrah commented on a diff in pull request #13729: KAFKA-14970: Dual write mode testing for SCRAM and Quota

mumrah commented on code in PR #13729:
URL: https://github.com/apache/kafka/pull/13729#discussion_r1204650002


##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -187,13 +188,74 @@ class ZkMigrationIntegrationTest {
     migrationState = migrationClient.releaseControllerLeadership(migrationState)
   }
 
+  // SCRAM and Quota are intermixed. Test SCRAM Only here
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_5_IV2, serverProperties = Array(
+    new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
+    new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testDualWriteScram(zkCluster: ClusterInstance): Unit = {
+    var admin = zkCluster.createAdminClient()
+    createUserScramCredentials(admin).all().get(60, TimeUnit.SECONDS)
+    admin.close()
+
+    val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+
+    // Bootstrap the ZK cluster ID into KRaft
+    val clusterId = zkCluster.clusterId()
+    val kraftCluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_5_IV2).
+        setClusterId(Uuid.fromString(clusterId)).
+        setNumBrokerNodes(0).
+        setNumControllerNodes(1).build())
+      .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+      .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .build()
+    try {
+      kraftCluster.format()
+      kraftCluster.startup()
+      val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+      // Enable migration configs and restart brokers
+      log.info("Restart brokers in migration mode")
+      val clientProps = kraftCluster.controllerClientProperties()
+      val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+      zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
+      zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+      zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+      zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      zkCluster.rollingBrokerRestart()
+      zkCluster.waitForReadyBrokers()
+      readyFuture.get(30, TimeUnit.SECONDS)
+
+      // Wait for migration to begin
+      log.info("Waiting for ZK migration to begin")
+      TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
+
+      // Alter the metadata
+      log.info("Updating metadata with AdminClient")
+      admin = zkCluster.createAdminClient()
+      alterUserScramCredentials(admin).all().get(60, TimeUnit.SECONDS)
+
+      // Verify the changes made to KRaft are seen in ZK
+      log.info("Verifying metadata changes with ZK")
+      verifyUserScramCredentials(zkClient)
+    } finally {
+      zkCluster.stop()
+      kraftCluster.close()
+    }
+  }
+
+  // SCRAM and Quota are intermixed. Test Quota Only here
   @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_4_IV0, serverProperties = Array(
     new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
     new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
     new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
   ))
-  def testDualWrite(zkCluster: ClusterInstance): Unit = {
+  def testDualWriteQuota(zkCluster: ClusterInstance): Unit = {

Review Comment:
   Can we leave this test name the same? If we change test names, Jenkins looses track of the execution history



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org