You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/06/21 16:22:11 UTC
[spark] branch master updated: [SPARK-39542][YARN] Improve YARN client mode to support IPv6
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 678826e4829 [SPARK-39542][YARN] Improve YARN client mode to support IPv6
678826e4829 is described below
commit 678826e482967c7e56b04d28c9ad9aaccdb20de7
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Tue Jun 21 09:21:45 2022 -0700
[SPARK-39542][YARN] Improve YARN client mode to support IPv6
### What changes were proposed in this pull request?
This PR aims to improve YARN client mode to support IPv6.
### Why are the changes needed?
On `IPv6`-only environment, YARN module passes the UTs if we exclude `ExtendedYarnTest`. Among the failures, this PR focus on `YARN client mode` first. Please note that YARN module passes if IPv4 and IPv6 coexist.
**BEFORE**
```
% build/sbt "yarn/testOnly *.YarnClusterSuite -- -z yarn-client" -Pyarn
...
Using SPARK_LOCAL_HOSTNAME=[::1]
Using SPARK_LOCAL_IP=[::1]
...
[info] YarnClusterSuite:
[info] - run Spark in yarn-client mode *** FAILED *** (2 seconds, 144 milliseconds)
[info] FAILED did not equal FINISHED (stdout/stderr was not captured) (BaseYarnClusterSuite.scala:233)
...
```
**AFTER**
```
% build/sbt "yarn/testOnly *.YarnClusterSuite -- -z yarn-client" -Pyarn
...
Using SPARK_LOCAL_HOSTNAME=[::1]
Using SPARK_LOCAL_IP=[::1]
...
[info] YarnClusterSuite:
[info] - run Spark in yarn-client mode (10 seconds, 172 milliseconds)
[info] - run Spark in yarn-client mode with unmanaged am (7 seconds, 108 milliseconds)
[info] - run Spark in yarn-client mode with different configurations, ensuring redaction (8 seconds, 112 milliseconds)
[info] - SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' (8 seconds, 118 milliseconds)
[info] - SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' and gateway-replacement path (7 seconds, 115 milliseconds)
[info] - SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file' (9 seconds, 104 milliseconds)
[info] - run Python application in yarn-client mode (11 seconds, 95 milliseconds)
[info] Run completed in 1 minute, 21 seconds.
[info] Total number of tests run: 7
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 7, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
Closes #36939 from dongjoon-hyun/SPARK-39542.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 4 ++--
.../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 ++
.../org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala | 11 +++++++++--
3 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 31c25688480..15b581cdcc1 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -313,7 +313,7 @@ private[spark] class ApplicationMaster(
sparkConf.get(DRIVER_PORT)),
YarnSchedulerBackend.ENDPOINT_NAME)
// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
- registerAM(Utils.localHostName, -1, sparkConf,
+ registerAM(Utils.localHostNameForURI(), -1, sparkConf,
sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId)
val encodedAppId = URLEncoder.encode(appAttemptId.getApplicationId.toString, "UTF-8")
addAmIpFilter(Some(driverRef), s"/proxy/$encodedAppId")
@@ -542,7 +542,7 @@ private[spark] class ApplicationMaster(
}
private def runExecutorLauncher(): Unit = {
- val hostname = Utils.localHostName
+ val hostname = Utils.localHostNameForURI()
val amCores = sparkConf.get(AM_CORES)
val rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
amCores, true)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f364b792160..66873961100 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -987,6 +987,8 @@ private[spark] class Client(
val javaOpts = ListBuffer[String]()
+ javaOpts += s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6}"
+
// SPARK-37106: To start AM with Java 17, `JavaModuleOptions.defaultModuleOptions`
// is added by default. It will not affect Java 8 and Java 11 due to existence of
// `-XX:+IgnoreUnrecognizedVMOptions`.
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 87ea44255cc..d8867848278 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -106,6 +106,9 @@ abstract class BaseYarnClusterSuite
yarnConf.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*")
yarnConf.setInt("yarn.scheduler.capacity.node-locality-delay", -1)
+ // Support both IPv4 and IPv6
+ yarnConf.set("yarn.resourcemanager.hostname", Utils.localHostNameForURI())
+
try {
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(yarnConf)
@@ -133,7 +136,7 @@ abstract class BaseYarnClusterSuite
// done so in a timely manner (defined to be 10 seconds).
val config = yarnCluster.getConfig()
val startTimeNs = System.nanoTime()
- while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
+ while (config.get(YarnConfiguration.RM_ADDRESS).split(":").last == "0") {
if (System.nanoTime() - startTimeNs > TimeUnit.SECONDS.toNanos(10)) {
throw new IllegalStateException("Timed out waiting for RM to come up.")
}
@@ -169,7 +172,9 @@ abstract class BaseYarnClusterSuite
outFile: Option[File] = None): SparkAppHandle.State = {
val deployMode = if (clientMode) "client" else "cluster"
val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
- val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
+ val env = Map(
+ "YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath(),
+ "SPARK_PREFER_IPV6" -> Utils.preferIPv6.toString) ++ extraEnv
val launcher = new SparkLauncher(env.asJava)
if (klass.endsWith(".py")) {
@@ -182,6 +187,8 @@ abstract class BaseYarnClusterSuite
.setMaster("yarn")
.setDeployMode(deployMode)
.setConf(EXECUTOR_INSTANCES.key, "1")
+ .setConf(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS,
+ s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6}")
.setPropertiesFile(propsFile)
.addAppArgs(appArgs.toArray: _*)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org