You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/02/15 20:44:33 UTC

[spark] branch master updated: [SPARK-26790][CORE] Change approach for retrieving executor logs and attributes: self-retrieve

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b6c6875  [SPARK-26790][CORE] Change approach for retrieving executor logs and attributes: self-retrieve
b6c6875 is described below

commit b6c68755715e36f199c172443862ca4bfde3ced5
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Fri Feb 15 12:44:14 2019 -0800

    [SPARK-26790][CORE] Change approach for retrieving executor logs and attributes: self-retrieve
    
    ## What changes were proposed in this pull request?
    
    This patch proposes to change the approach on extracting log urls as well as attributes from YARN executor:
    
     - AS-IS: extract information from `Container` API and include them to container launch context
    - TO-BE: let YARN executor self-extracting information
    
    This approach leads us to populate more attributes like nodemanager's IPC port which can let us configure custom log url to JHS log url directly.
    
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #23706 from HeartSaVioR/SPARK-26790.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../executor/CoarseGrainedExecutorBackend.scala    | 52 +++++++++------
 docs/running-on-yarn.md                            | 18 +++++
 .../spark/deploy/yarn/ExecutorRunnable.scala       | 17 +----
 .../YarnCoarseGrainedExecutorBackend.scala         | 77 ++++++++++++++++++++++
 .../spark/util/YarnContainerInfoHelper.scala       | 22 ++++++-
 .../spark/deploy/yarn/YarnClusterSuite.scala       |  3 +
 6 files changed, 151 insertions(+), 38 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 2865c3b..645f587 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -181,33 +181,47 @@ private[spark] class CoarseGrainedExecutorBackend(
 
 private[spark] object CoarseGrainedExecutorBackend extends Logging {
 
-  private def run(
+  case class Arguments(
       driverUrl: String,
       executorId: String,
       hostname: String,
       cores: Int,
       appId: String,
       workerUrl: Option[String],
-      userClassPath: Seq[URL]) {
+      userClassPath: mutable.ListBuffer[URL])
+
+  def main(args: Array[String]): Unit = {
+    val createFn: (RpcEnv, Arguments, SparkEnv) =>
+      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
+      new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
+        arguments.hostname, arguments.cores, arguments.userClassPath, env)
+    }
+    run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
+    System.exit(0)
+  }
+
+  def run(
+      arguments: Arguments,
+      backendCreateFn: (RpcEnv, Arguments, SparkEnv) => CoarseGrainedExecutorBackend): Unit = {
 
     Utils.initDaemon(log)
 
     SparkHadoopUtil.get.runAsSparkUser { () =>
       // Debug code
-      Utils.checkHost(hostname)
+      Utils.checkHost(arguments.hostname)
 
       // Bootstrap to fetch the driver's Spark properties.
       val executorConf = new SparkConf
       val fetcher = RpcEnv.create(
         "driverPropsFetcher",
-        hostname,
+        arguments.hostname,
         -1,
         executorConf,
         new SecurityManager(executorConf),
         clientMode = true)
-      val driver = fetcher.setupEndpointRefByURI(driverUrl)
+      val driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
       val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
-      val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
+      val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
       fetcher.shutdown()
 
       // Create SparkEnv using properties we fetched from the driver.
@@ -225,19 +239,18 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
       }
 
-      val env = SparkEnv.createExecutorEnv(
-        driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
+      val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.hostname,
+        arguments.cores, cfg.ioEncryptionKey, isLocal = false)
 
-      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
-        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
-      workerUrl.foreach { url =>
+      env.rpcEnv.setupEndpoint("Executor", backendCreateFn(env.rpcEnv, arguments, env))
+      arguments.workerUrl.foreach { url =>
         env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
       }
       env.rpcEnv.awaitTermination()
     }
   }
 
-  def main(args: Array[String]) {
+  def parseArguments(args: Array[String], classNameForEntry: String): Arguments = {
     var driverUrl: String = null
     var executorId: String = null
     var hostname: String = null
@@ -276,24 +289,24 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
           // scalastyle:off println
           System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
           // scalastyle:on println
-          printUsageAndExit()
+          printUsageAndExit(classNameForEntry)
       }
     }
 
     if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
       appId == null) {
-      printUsageAndExit()
+      printUsageAndExit(classNameForEntry)
     }
 
-    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
-    System.exit(0)
+    Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl,
+      userClassPath)
   }
 
-  private def printUsageAndExit() = {
+  private def printUsageAndExit(classNameForEntry: String): Unit = {
     // scalastyle:off println
     System.err.println(
-      """
-      |Usage: CoarseGrainedExecutorBackend [options]
+      s"""
+      |Usage: $classNameForEntry [options]
       |
       | Options are:
       |   --driver-url <driverUrl>
@@ -307,5 +320,4 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
     // scalastyle:on println
     System.exit(1)
   }
-
 }
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 51a13d3..8f1a127 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -481,6 +481,18 @@ To use a custom metrics.properties for the application master and executors, upd
       <td>`http://` or `https://` according to YARN HTTP policy. (Configured via `yarn.http.policy`)</td>
     </tr>
     <tr>
+      <td>{{NM_HOST}}</td>
+      <td>The "host" of node where container was run.</td>
+    </tr>
+    <tr>
+      <td>{{NM_PORT}}</td>
+      <td>The "port" of node manager where container was run.</td>
+    </tr>
+    <tr>
+      <td>{{NM_HTTP_PORT}}</td>
+      <td>The "port" of node manager's http server where container was run.</td>
+    </tr>
+    <tr>
       <td>{{NM_HTTP_ADDRESS}}</td>
       <td>Http URI of the node on which the container is allocated.</td>
     </tr>
@@ -502,6 +514,12 @@ To use a custom metrics.properties for the application master and executors, upd
     </tr>
 </table>
 
+For example, suppose you would like to point log url link to Job History Server directly instead of let NodeManager http server redirects it, you can configure `spark.history.custom.executor.log.url` as below:
+
+ `{{HTTP_SCHEME}}<JHS_HOST>:<JHS_PORT>/jobhistory/logs/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}?start=-4096`
+
+ NOTE: you need to replace `<JHS_POST>` and `<JHS_PORT>` with actual value.
+
 # Important notes
 
 - Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 0b909d1..2f8f2a0 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -202,7 +202,7 @@ private[yarn] class ExecutorRunnable(
     val commands = prefixEnv ++
       Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
       javaOpts ++
-      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+      Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
         "--driver-url", masterAddress,
         "--executor-id", executorId,
         "--hostname", hostname,
@@ -235,21 +235,6 @@ private[yarn] class ExecutorRunnable(
       }
     }
 
-    // Add log urls, as well as executor attributes
-    container.foreach { c =>
-      YarnContainerInfoHelper.getLogUrls(conf, Some(c)).foreach { m =>
-        m.foreach { case (fileName, url) =>
-          env("SPARK_LOG_URL_" + fileName.toUpperCase(Locale.ROOT)) = url
-        }
-      }
-
-      YarnContainerInfoHelper.getAttributes(conf, Some(c)).foreach { m =>
-        m.foreach { case (attr, value) =>
-          env("SPARK_EXECUTOR_ATTRIBUTE_" + attr.toUpperCase(Locale.ROOT)) = value
-        }
-      }
-    }
-
     env
   }
 }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
new file mode 100644
index 0000000..53e99d9
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.net.URL
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.util.YarnContainerInfoHelper
+
+/**
+ * Custom implementation of CoarseGrainedExecutorBackend for YARN resource manager.
+ * This class extracts executor log URLs and executor attributes from system environment which
+ * properties are available for container being set via YARN.
+ */
+private[spark] class YarnCoarseGrainedExecutorBackend(
+    rpcEnv: RpcEnv,
+    driverUrl: String,
+    executorId: String,
+    hostname: String,
+    cores: Int,
+    userClassPath: Seq[URL],
+    env: SparkEnv)
+  extends CoarseGrainedExecutorBackend(
+    rpcEnv,
+    driverUrl,
+    executorId,
+    hostname,
+    cores,
+    userClassPath,
+    env) with Logging {
+
+  private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
+
+  override def extractLogUrls: Map[String, String] = {
+    YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None)
+      .getOrElse(Map())
+  }
+
+  override def extractAttributes: Map[String, String] = {
+    YarnContainerInfoHelper.getAttributes(hadoopConfiguration, container = None)
+      .getOrElse(Map())
+  }
+}
+
+private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
+      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
+      new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
+        arguments.hostname, arguments.cores, arguments.userClassPath, env)
+    }
+    val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
+      this.getClass.getCanonicalName.stripSuffix("$"))
+    CoarseGrainedExecutorBackend.run(backendArgs, createFn)
+    System.exit(0)
+  }
+
+}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala
index 96350cd..5e39422 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala
@@ -59,6 +59,9 @@ private[spark] object YarnContainerInfoHelper extends Logging {
       val yarnConf = new YarnConfiguration(conf)
       Some(Map(
         "HTTP_SCHEME" -> getYarnHttpScheme(yarnConf),
+        "NM_HOST" -> getNodeManagerHost(container),
+        "NM_PORT" -> getNodeManagerPort(container),
+        "NM_HTTP_PORT" -> getNodeManagerHttpPort(container),
         "NM_HTTP_ADDRESS" -> getNodeManagerHttpAddress(container),
         "CLUSTER_ID" -> getClusterId(yarnConf).getOrElse(""),
         "CONTAINER_ID" -> ConverterUtils.toString(getContainerId(container)),
@@ -97,7 +100,22 @@ private[spark] object YarnContainerInfoHelper extends Logging {
 
   def getNodeManagerHttpAddress(container: Option[Container]): String = container match {
     case Some(c) => c.getNodeHttpAddress
-    case None => System.getenv(Environment.NM_HOST.name()) + ":" +
-      System.getenv(Environment.NM_HTTP_PORT.name())
+    case None => getNodeManagerHost(None) + ":" + getNodeManagerHttpPort(None)
   }
+
+  def getNodeManagerHost(container: Option[Container]): String = container match {
+    case Some(c) => c.getNodeHttpAddress.split(":")(0)
+    case None => System.getenv(Environment.NM_HOST.name())
+  }
+
+  def getNodeManagerHttpPort(container: Option[Container]): String = container match {
+    case Some(c) => c.getNodeHttpAddress.split(":")(1)
+    case None => System.getenv(Environment.NM_HTTP_PORT.name())
+  }
+
+  def getNodeManagerPort(container: Option[Container]): String = container match {
+    case Some(_) => "-1" // Just return invalid port given we cannot retrieve the value
+    case None => System.getenv(Environment.NM_PORT.name())
+  }
+
 }
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index b3c5bbd..56b7dfc 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -477,6 +477,9 @@ private object YarnClusterDriver extends Logging with Matchers {
         val driverAttributes = listener.driverAttributes.get
         val expectationAttributes = Map(
           "HTTP_SCHEME" -> YarnContainerInfoHelper.getYarnHttpScheme(yarnConf),
+          "NM_HOST" -> YarnContainerInfoHelper.getNodeManagerHost(container = None),
+          "NM_PORT" -> YarnContainerInfoHelper.getNodeManagerPort(container = None),
+          "NM_HTTP_PORT" -> YarnContainerInfoHelper.getNodeManagerHttpPort(container = None),
           "NM_HTTP_ADDRESS" -> YarnContainerInfoHelper.getNodeManagerHttpAddress(container = None),
           "CLUSTER_ID" -> YarnContainerInfoHelper.getClusterId(yarnConf).getOrElse(""),
           "CONTAINER_ID" -> ConverterUtils.toString(containerId),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org