You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/06/20 14:07:27 UTC

[spark] branch master updated: [SPARK-39530][SS][TESTS] Fix `KafkaTestUtils` to support IPv6

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 550f5fe42dc [SPARK-39530][SS][TESTS] Fix `KafkaTestUtils` to support IPv6
550f5fe42dc is described below

commit 550f5fe42dcb078e1cf9460ac7a9a7689918e92b
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Mon Jun 20 22:07:12 2022 +0800

    [SPARK-39530][SS][TESTS] Fix `KafkaTestUtils` to support IPv6
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix `KafkaTestUtils` to support IPv6.
    
    ### Why are the changes needed?
    
    Currently, the test suite is using a hard-coded `127.0.0.1` like the following.
    ```
    props.put("listeners", "SASL_PLAINTEXT://127.0.0.1:0")
    props.put("advertised.listeners", "SASL_PLAINTEXT://127.0.0.1:0")
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a test-only change.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #36923 from dongjoon-hyun/SPARK-39530.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 27 +++++++++++-----------
 .../spark/streaming/kafka010/KafkaTestUtils.scala  | 12 ++++++----
 2 files changed, 21 insertions(+), 18 deletions(-)

diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index c5d2a99d156..58b8778c963 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.kafka010
 
 import java.io.{File, IOException}
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
 import java.nio.charset.StandardCharsets
 import java.util.{Collections, Properties, UUID}
 import java.util.concurrent.TimeUnit
@@ -68,13 +68,13 @@ class KafkaTestUtils(
 
   private val JAVA_AUTH_CONFIG = "java.security.auth.login.config"
 
-  private val localCanonicalHostName = InetAddress.getLoopbackAddress().getCanonicalHostName()
-  logInfo(s"Local host name is $localCanonicalHostName")
+  private val localHostNameForURI = Utils.localHostNameForURI()
+  logInfo(s"Local host name is $localHostNameForURI")
 
   private var kdc: MiniKdc = _
 
   // Zookeeper related configurations
-  private val zkHost = localCanonicalHostName
+  private val zkHost = localHostNameForURI
   private var zkPort: Int = 0
   private val zkConnectionTimeout = 60000
   private val zkSessionTimeout = 10000
@@ -83,12 +83,12 @@ class KafkaTestUtils(
   private var zkClient: KafkaZkClient = _
 
   // Kafka broker related configurations
-  private val brokerHost = localCanonicalHostName
+  private val brokerHost = localHostNameForURI
   private var brokerPort = 0
   private var brokerConf: KafkaConfig = _
 
   private val brokerServiceName = "kafka"
-  private val clientUser = s"client/$localCanonicalHostName"
+  private val clientUser = s"client/$localHostNameForURI"
   private var clientKeytabFile: File = _
 
   // Kafka broker server
@@ -202,17 +202,17 @@ class KafkaTestUtils(
     assert(kdcReady, "KDC should be set up beforehand")
     val baseDir = Utils.createTempDir()
 
-    val zkServerUser = s"zookeeper/$localCanonicalHostName"
+    val zkServerUser = s"zookeeper/$localHostNameForURI"
     val zkServerKeytabFile = new File(baseDir, "zookeeper.keytab")
     kdc.createPrincipal(zkServerKeytabFile, zkServerUser)
     logDebug(s"Created keytab file: ${zkServerKeytabFile.getAbsolutePath()}")
 
-    val zkClientUser = s"zkclient/$localCanonicalHostName"
+    val zkClientUser = s"zkclient/$localHostNameForURI"
     val zkClientKeytabFile = new File(baseDir, "zkclient.keytab")
     kdc.createPrincipal(zkClientKeytabFile, zkClientUser)
     logDebug(s"Created keytab file: ${zkClientKeytabFile.getAbsolutePath()}")
 
-    val kafkaServerUser = s"kafka/$localCanonicalHostName"
+    val kafkaServerUser = s"kafka/$localHostNameForURI"
     val kafkaServerKeytabFile = new File(baseDir, "kafka.keytab")
     kdc.createPrincipal(kafkaServerKeytabFile, kafkaServerUser)
     logDebug(s"Created keytab file: ${kafkaServerKeytabFile.getAbsolutePath()}")
@@ -489,7 +489,7 @@ class KafkaTestUtils(
   protected def brokerConfiguration: Properties = {
     val props = new Properties()
     props.put("broker.id", "0")
-    props.put("listeners", s"PLAINTEXT://127.0.0.1:$brokerPort")
+    props.put("listeners", s"PLAINTEXT://$localHostNameForURI:$brokerPort")
     props.put("log.dir", Utils.createTempDir().getAbsolutePath)
     props.put("zookeeper.connect", zkAddress)
     props.put("zookeeper.connection.timeout.ms", "60000")
@@ -505,8 +505,8 @@ class KafkaTestUtils(
     props.put("transaction.state.log.min.isr", "1")
 
     if (secure) {
-      props.put("listeners", "SASL_PLAINTEXT://127.0.0.1:0")
-      props.put("advertised.listeners", "SASL_PLAINTEXT://127.0.0.1:0")
+      props.put("listeners", s"SASL_PLAINTEXT://$localHostNameForURI:0")
+      props.put("advertised.listeners", s"SASL_PLAINTEXT://$localHostNameForURI:0")
       props.put("inter.broker.listener.name", "SASL_PLAINTEXT")
       props.put("delegation.token.master.key", UUID.randomUUID().toString)
       props.put("sasl.enabled.mechanisms", "GSSAPI,SCRAM-SHA-512")
@@ -648,7 +648,8 @@ class KafkaTestUtils(
     val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
     val (ip, port) = {
       val splits = zkConnect.split(":")
-      (splits(0), splits(1).toInt)
+      val port = splits(splits.length - 1)
+      (zkConnect.substring(0, zkConnect.length - port.length - 1), port.toInt)
     }
     val factory = new NIOServerCnxnFactory()
     factory.configure(new InetSocketAddress(ip, port), 16)
diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index dd8d66f1fc0..d341b6977b2 100644
--- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -50,9 +50,10 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
  * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
  */
 private[kafka010] class KafkaTestUtils extends Logging {
+  private val localHostNameForURI = Utils.localHostNameForURI()
 
   // Zookeeper related configurations
-  private val zkHost = "127.0.0.1"
+  private val zkHost = localHostNameForURI
   private var zkPort: Int = 0
   private val zkConnectionTimeout = 60000
   private val zkSessionTimeout = 10000
@@ -63,7 +64,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   private var admClient: AdminZkClient = _
 
   // Kafka broker related configurations
-  private val brokerHost = "127.0.0.1"
+  private val brokerHost = localHostNameForURI
   private var brokerPort = 0
   private var brokerConf: KafkaConfig = _
 
@@ -239,8 +240,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
   private def brokerConfiguration: Properties = {
     val props = new Properties()
     props.put("broker.id", "0")
-    props.put("host.name", "127.0.0.1")
-    props.put("advertised.host.name", "127.0.0.1")
+    props.put("host.name", localHostNameForURI)
+    props.put("advertised.host.name", localHostNameForURI)
     props.put("port", brokerPort.toString)
     props.put("log.dir", brokerLogDir)
     props.put("zookeeper.connect", zkAddress)
@@ -319,7 +320,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
     val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
     val (ip, port) = {
       val splits = zkConnect.split(":")
-      (splits(0), splits(1).toInt)
+      val port = splits(splits.length - 1)
+      (zkConnect.substring(0, zkConnect.length - port.length - 1), port.toInt)
     }
     val factory = new NIOServerCnxnFactory()
     factory.configure(new InetSocketAddress(ip, port), 16)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org