You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/06/21 16:22:11 UTC

[spark] branch master updated: [SPARK-39542][YARN] Improve YARN client mode to support IPv6

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 678826e4829 [SPARK-39542][YARN] Improve YARN client mode to support IPv6
678826e4829 is described below

commit 678826e482967c7e56b04d28c9ad9aaccdb20de7
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Tue Jun 21 09:21:45 2022 -0700

    [SPARK-39542][YARN] Improve YARN client mode to support IPv6
    
    ### What changes were proposed in this pull request?
    
    This PR aims to improve YARN client mode to support IPv6.
    
    ### Why are the changes needed?
    
    On `IPv6`-only environment, YARN module passes the UTs if we exclude `ExtendedYarnTest`. Among the failures, this PR focus on `YARN client mode` first. Please note that YARN module passes if IPv4 and IPv6 coexist.
    
    **BEFORE**
    ```
    % build/sbt "yarn/testOnly *.YarnClusterSuite -- -z yarn-client" -Pyarn
    ...
    Using SPARK_LOCAL_HOSTNAME=[::1]
    Using SPARK_LOCAL_IP=[::1]
    ...
    [info] YarnClusterSuite:
    [info] - run Spark in yarn-client mode *** FAILED *** (2 seconds, 144 milliseconds)
    [info]   FAILED did not equal FINISHED (stdout/stderr was not captured) (BaseYarnClusterSuite.scala:233)
    ...
    ```
    
    **AFTER**
    ```
    % build/sbt "yarn/testOnly *.YarnClusterSuite -- -z yarn-client" -Pyarn
    ...
    Using SPARK_LOCAL_HOSTNAME=[::1]
    Using SPARK_LOCAL_IP=[::1]
    ...
    [info] YarnClusterSuite:
    [info] - run Spark in yarn-client mode (10 seconds, 172 milliseconds)
    [info] - run Spark in yarn-client mode with unmanaged am (7 seconds, 108 milliseconds)
    [info] - run Spark in yarn-client mode with different configurations, ensuring redaction (8 seconds, 112 milliseconds)
    [info] - SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' (8 seconds, 118 milliseconds)
    [info] - SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' and gateway-replacement path (7 seconds, 115 milliseconds)
    [info] - SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file' (9 seconds, 104 milliseconds)
    [info] - run Python application in yarn-client mode (11 seconds, 95 milliseconds)
    [info] Run completed in 1 minute, 21 seconds.
    [info] Total number of tests run: 7
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 7, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #36939 from dongjoon-hyun/SPARK-39542.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/spark/deploy/yarn/ApplicationMaster.scala      |  4 ++--
 .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala  |  2 ++
 .../org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala   | 11 +++++++++--
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 31c25688480..15b581cdcc1 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -313,7 +313,7 @@ private[spark] class ApplicationMaster(
           sparkConf.get(DRIVER_PORT)),
         YarnSchedulerBackend.ENDPOINT_NAME)
       // The client-mode AM doesn't listen for incoming connections, so report an invalid port.
-      registerAM(Utils.localHostName, -1, sparkConf,
+      registerAM(Utils.localHostNameForURI(), -1, sparkConf,
         sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId)
       val encodedAppId = URLEncoder.encode(appAttemptId.getApplicationId.toString, "UTF-8")
       addAmIpFilter(Some(driverRef), s"/proxy/$encodedAppId")
@@ -542,7 +542,7 @@ private[spark] class ApplicationMaster(
   }
 
   private def runExecutorLauncher(): Unit = {
-    val hostname = Utils.localHostName
+    val hostname = Utils.localHostNameForURI()
     val amCores = sparkConf.get(AM_CORES)
     val rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
       amCores, true)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f364b792160..66873961100 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -987,6 +987,8 @@ private[spark] class Client(
 
     val javaOpts = ListBuffer[String]()
 
+    javaOpts += s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6}"
+
     // SPARK-37106: To start AM with Java 17, `JavaModuleOptions.defaultModuleOptions`
     // is added by default. It will not affect Java 8 and Java 11 due to existence of
     // `-XX:+IgnoreUnrecognizedVMOptions`.
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 87ea44255cc..d8867848278 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -106,6 +106,9 @@ abstract class BaseYarnClusterSuite
     yarnConf.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*")
     yarnConf.setInt("yarn.scheduler.capacity.node-locality-delay", -1)
 
+    // Support both IPv4 and IPv6
+    yarnConf.set("yarn.resourcemanager.hostname", Utils.localHostNameForURI())
+
     try {
       yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
       yarnCluster.init(yarnConf)
@@ -133,7 +136,7 @@ abstract class BaseYarnClusterSuite
     // done so in a timely manner (defined to be 10 seconds).
     val config = yarnCluster.getConfig()
     val startTimeNs = System.nanoTime()
-    while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
+    while (config.get(YarnConfiguration.RM_ADDRESS).split(":").last == "0") {
       if (System.nanoTime() - startTimeNs > TimeUnit.SECONDS.toNanos(10)) {
         throw new IllegalStateException("Timed out waiting for RM to come up.")
       }
@@ -169,7 +172,9 @@ abstract class BaseYarnClusterSuite
       outFile: Option[File] = None): SparkAppHandle.State = {
     val deployMode = if (clientMode) "client" else "cluster"
     val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
-    val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
+    val env = Map(
+      "YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath(),
+      "SPARK_PREFER_IPV6" -> Utils.preferIPv6.toString) ++ extraEnv
 
     val launcher = new SparkLauncher(env.asJava)
     if (klass.endsWith(".py")) {
@@ -182,6 +187,8 @@ abstract class BaseYarnClusterSuite
       .setMaster("yarn")
       .setDeployMode(deployMode)
       .setConf(EXECUTOR_INSTANCES.key, "1")
+      .setConf(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS,
+        s"-Djava.net.preferIPv6Addresses=${Utils.preferIPv6}")
       .setPropertiesFile(propsFile)
       .addAppArgs(appArgs.toArray: _*)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org