You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/06/20 14:07:27 UTC
[spark] branch master updated: [SPARK-39530][SS][TESTS] Fix `KafkaTestUtils` to support IPv6
This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 550f5fe42dc [SPARK-39530][SS][TESTS] Fix `KafkaTestUtils` to support IPv6
550f5fe42dc is described below
commit 550f5fe42dcb078e1cf9460ac7a9a7689918e92b
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Mon Jun 20 22:07:12 2022 +0800
[SPARK-39530][SS][TESTS] Fix `KafkaTestUtils` to support IPv6
### What changes were proposed in this pull request?
This PR aims to fix `KafkaTestUtils` to support IPv6.
### Why are the changes needed?
Currently, the test suite is using a hard-coded `127.0.0.1` like the following.
```
props.put("listeners", "SASL_PLAINTEXT://127.0.0.1:0")
props.put("advertised.listeners", "SASL_PLAINTEXT://127.0.0.1:0")
```
### Does this PR introduce _any_ user-facing change?
No. This is a test-only change.
### How was this patch tested?
Pass the CIs.
Closes #36923 from dongjoon-hyun/SPARK-39530.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Yuming Wang <yu...@ebay.com>
---
.../apache/spark/sql/kafka010/KafkaTestUtils.scala | 27 +++++++++++-----------
.../spark/streaming/kafka010/KafkaTestUtils.scala | 12 ++++++----
2 files changed, 21 insertions(+), 18 deletions(-)
diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index c5d2a99d156..58b8778c963 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010
import java.io.{File, IOException}
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
import java.nio.charset.StandardCharsets
import java.util.{Collections, Properties, UUID}
import java.util.concurrent.TimeUnit
@@ -68,13 +68,13 @@ class KafkaTestUtils(
private val JAVA_AUTH_CONFIG = "java.security.auth.login.config"
- private val localCanonicalHostName = InetAddress.getLoopbackAddress().getCanonicalHostName()
- logInfo(s"Local host name is $localCanonicalHostName")
+ private val localHostNameForURI = Utils.localHostNameForURI()
+ logInfo(s"Local host name is $localHostNameForURI")
private var kdc: MiniKdc = _
// Zookeeper related configurations
- private val zkHost = localCanonicalHostName
+ private val zkHost = localHostNameForURI
private var zkPort: Int = 0
private val zkConnectionTimeout = 60000
private val zkSessionTimeout = 10000
@@ -83,12 +83,12 @@ class KafkaTestUtils(
private var zkClient: KafkaZkClient = _
// Kafka broker related configurations
- private val brokerHost = localCanonicalHostName
+ private val brokerHost = localHostNameForURI
private var brokerPort = 0
private var brokerConf: KafkaConfig = _
private val brokerServiceName = "kafka"
- private val clientUser = s"client/$localCanonicalHostName"
+ private val clientUser = s"client/$localHostNameForURI"
private var clientKeytabFile: File = _
// Kafka broker server
@@ -202,17 +202,17 @@ class KafkaTestUtils(
assert(kdcReady, "KDC should be set up beforehand")
val baseDir = Utils.createTempDir()
- val zkServerUser = s"zookeeper/$localCanonicalHostName"
+ val zkServerUser = s"zookeeper/$localHostNameForURI"
val zkServerKeytabFile = new File(baseDir, "zookeeper.keytab")
kdc.createPrincipal(zkServerKeytabFile, zkServerUser)
logDebug(s"Created keytab file: ${zkServerKeytabFile.getAbsolutePath()}")
- val zkClientUser = s"zkclient/$localCanonicalHostName"
+ val zkClientUser = s"zkclient/$localHostNameForURI"
val zkClientKeytabFile = new File(baseDir, "zkclient.keytab")
kdc.createPrincipal(zkClientKeytabFile, zkClientUser)
logDebug(s"Created keytab file: ${zkClientKeytabFile.getAbsolutePath()}")
- val kafkaServerUser = s"kafka/$localCanonicalHostName"
+ val kafkaServerUser = s"kafka/$localHostNameForURI"
val kafkaServerKeytabFile = new File(baseDir, "kafka.keytab")
kdc.createPrincipal(kafkaServerKeytabFile, kafkaServerUser)
logDebug(s"Created keytab file: ${kafkaServerKeytabFile.getAbsolutePath()}")
@@ -489,7 +489,7 @@ class KafkaTestUtils(
protected def brokerConfiguration: Properties = {
val props = new Properties()
props.put("broker.id", "0")
- props.put("listeners", s"PLAINTEXT://127.0.0.1:$brokerPort")
+ props.put("listeners", s"PLAINTEXT://$localHostNameForURI:$brokerPort")
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
props.put("zookeeper.connect", zkAddress)
props.put("zookeeper.connection.timeout.ms", "60000")
@@ -505,8 +505,8 @@ class KafkaTestUtils(
props.put("transaction.state.log.min.isr", "1")
if (secure) {
- props.put("listeners", "SASL_PLAINTEXT://127.0.0.1:0")
- props.put("advertised.listeners", "SASL_PLAINTEXT://127.0.0.1:0")
+ props.put("listeners", s"SASL_PLAINTEXT://$localHostNameForURI:0")
+ props.put("advertised.listeners", s"SASL_PLAINTEXT://$localHostNameForURI:0")
props.put("inter.broker.listener.name", "SASL_PLAINTEXT")
props.put("delegation.token.master.key", UUID.randomUUID().toString)
props.put("sasl.enabled.mechanisms", "GSSAPI,SCRAM-SHA-512")
@@ -648,7 +648,8 @@ class KafkaTestUtils(
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val (ip, port) = {
val splits = zkConnect.split(":")
- (splits(0), splits(1).toInt)
+ val port = splits(splits.length - 1)
+ (zkConnect.substring(0, zkConnect.length - port.length - 1), port.toInt)
}
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress(ip, port), 16)
diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index dd8d66f1fc0..d341b6977b2 100644
--- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -50,9 +50,10 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
private[kafka010] class KafkaTestUtils extends Logging {
+ private val localHostNameForURI = Utils.localHostNameForURI()
// Zookeeper related configurations
- private val zkHost = "127.0.0.1"
+ private val zkHost = localHostNameForURI
private var zkPort: Int = 0
private val zkConnectionTimeout = 60000
private val zkSessionTimeout = 10000
@@ -63,7 +64,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
private var admClient: AdminZkClient = _
// Kafka broker related configurations
- private val brokerHost = "127.0.0.1"
+ private val brokerHost = localHostNameForURI
private var brokerPort = 0
private var brokerConf: KafkaConfig = _
@@ -239,8 +240,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
private def brokerConfiguration: Properties = {
val props = new Properties()
props.put("broker.id", "0")
- props.put("host.name", "127.0.0.1")
- props.put("advertised.host.name", "127.0.0.1")
+ props.put("host.name", localHostNameForURI)
+ props.put("advertised.host.name", localHostNameForURI)
props.put("port", brokerPort.toString)
props.put("log.dir", brokerLogDir)
props.put("zookeeper.connect", zkAddress)
@@ -319,7 +320,8 @@ private[kafka010] class KafkaTestUtils extends Logging {
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
val (ip, port) = {
val splits = zkConnect.split(":")
- (splits(0), splits(1).toInt)
+ val port = splits(splits.length - 1)
+ (zkConnect.substring(0, zkConnect.length - port.length - 1), port.toInt)
}
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress(ip, port), 16)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org