You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2016/11/12 02:36:29 UTC

spark git commit: [SPARK-16759][CORE] Add a configuration property to pass caller contexts of upstream applications into Spark

Repository: spark
Updated Branches:
  refs/heads/master 46b2550bc -> 3af894511


[SPARK-16759][CORE] Add a configuration property to pass caller contexts of upstream applications into Spark

## What changes were proposed in this pull request?

Many applications take Spark as a computing engine and run on it. This PR adds a configuration property `spark.log.callerContext` that can be used by Spark's upstream applications (e.g. Oozie) to set up their caller contexts into Spark. In the end, Spark will combine its own caller context with the caller contexts of its upstream applications, and write them into Yarn RM log and HDFS audit log.

The audit log has a config to truncate the caller contexts passed in (default 128). The caller contexts will be sent over rpc, so it should be concise. The call context written into HDFS log and Yarn log consists of two parts: the information `A` specified by Spark itself and the value `B` of `spark.log.callerContext` property.  Currently `A` typically takes 64 to 74 characters,  so `B` can have up to 50 characters (mentioned in the doc `running-on-yarn.md`)
## How was this patch tested?

Manual tests. I have run some Spark applications with `spark.log.callerContext` configuration in Yarn client/cluster mode, and verified that the caller contexts were written into Yarn RM log and HDFS audit log correctly.

The ways to configure `spark.log.callerContext` property:
- In spark-defaults.conf:

```
spark.log.callerContext  infoSpecifiedByUpstreamApp
```
- In app's source code:

```
val spark = SparkSession
      .builder
      .appName("SparkKMeans")
      .config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")
      .getOrCreate()
```

When running on Spark Yarn cluster mode, the driver is unable to pass 'spark.log.callerContext' to Yarn client and AM since Yarn client and AM have already started before the driver performs `.config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")`.

The following  example shows the command line used to submit a SparkKMeans application and the corresponding records in Yarn RM log and HDFS audit log.

Command:

```
./bin/spark-submit --verbose --executor-cores 3 --num-executors 1 --master yarn --deploy-mode client --class org.apache.spark.examples.SparkKMeans examples/target/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar hdfs://localhost:9000/lr_big.txt 2 5
```

Yarn RM log:

<img width="1440" alt="screen shot 2016-10-19 at 9 12 03 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547050/7d2f278c-9649-11e6-9df8-8d5ff12609f0.png">

HDFS audit log:

<img width="1400" alt="screen shot 2016-10-19 at 10 18 14 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547102/096060ae-964a-11e6-981a-cb28efd5a058.png">

Author: Weiqing Yang <ya...@gmail.com>

Closes #15563 from weiqingy/SPARK-16759.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3af89451
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3af89451
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3af89451

Branch: refs/heads/master
Commit: 3af894511be6fcc17731e28b284dba432fe911f5
Parents: 46b2550
Author: Weiqing Yang <ya...@gmail.com>
Authored: Fri Nov 11 18:36:23 2016 -0800
Committer: Mridul Muralidharan <mr...@gmail.com>
Committed: Fri Nov 11 18:36:23 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |  4 ++
 .../scala/org/apache/spark/scheduler/Task.scala | 13 ++++-
 .../scala/org/apache/spark/util/Utils.scala     | 53 ++++++++++++--------
 docs/configuration.md                           |  9 ++++
 .../spark/deploy/yarn/ApplicationMaster.scala   |  3 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  3 +-
 6 files changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3af89451/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 4a3e3d5..2951bdc 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -207,6 +207,10 @@ package object config {
     .booleanConf
     .createWithDefault(false)
 
+  private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext")
+    .stringConf
+    .createOptional
+
   private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes")
     .doc("The maximum number of bytes to pack into a single partition when reading files.")
     .longConf

http://git-wip-us.apache.org/repos/asf/spark/blob/3af89451/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 9385e3c..d39651a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.config.APP_CALLER_CONTEXT
 import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.serializer.SerializerInstance
@@ -92,8 +93,16 @@ private[spark] abstract class Task[T](
       kill(interruptThread = false)
     }
 
-    new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId),
-      Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()
+    new CallerContext(
+      "TASK",
+      SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
+      appId,
+      appAttemptId,
+      jobId,
+      Option(stageId),
+      Option(stageAttemptId),
+      Option(taskAttemptId),
+      Option(attemptNumber)).setCurrentContext()
 
     try {
       runTask(context)

http://git-wip-us.apache.org/repos/asf/spark/blob/3af89451/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1de66af..c27cbe3 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2569,6 +2569,7 @@ private[util] object CallerContext extends Logging {
  * @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
  *
  * The parameters below are optional:
+ * @param upstreamCallerContext caller context the upstream application passes in
  * @param appId id of the app this task belongs to
  * @param appAttemptId attempt id of the app this task belongs to
  * @param jobId id of the job this task belongs to
@@ -2578,26 +2579,38 @@ private[util] object CallerContext extends Logging {
  * @param taskAttemptNumber task attempt id
  */
 private[spark] class CallerContext(
-   from: String,
-   appId: Option[String] = None,
-   appAttemptId: Option[String] = None,
-   jobId: Option[Int] = None,
-   stageId: Option[Int] = None,
-   stageAttemptId: Option[Int] = None,
-   taskId: Option[Long] = None,
-   taskAttemptNumber: Option[Int] = None) extends Logging {
-
-   val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
-   val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else ""
-   val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
-   val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
-   val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else ""
-   val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
-   val taskAttemptNumberStr =
-     if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""
-
-   val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
-     jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr
+  from: String,
+  upstreamCallerContext: Option[String] = None,
+  appId: Option[String] = None,
+  appAttemptId: Option[String] = None,
+  jobId: Option[Int] = None,
+  stageId: Option[Int] = None,
+  stageAttemptId: Option[Int] = None,
+  taskId: Option[Long] = None,
+  taskAttemptNumber: Option[Int] = None) extends Logging {
+
+  private val context = prepareContext("SPARK_" +
+    from +
+    appId.map("_" + _).getOrElse("") +
+    appAttemptId.map("_" + _).getOrElse("") +
+    jobId.map("_JId_" + _).getOrElse("") +
+    stageId.map("_SId_" + _).getOrElse("") +
+    stageAttemptId.map("_" + _).getOrElse("") +
+    taskId.map("_TId_" + _).getOrElse("") +
+    taskAttemptNumber.map("_" + _).getOrElse("") +
+    upstreamCallerContext.map("_" + _).getOrElse(""))
+
+  private def prepareContext(context: String): String = {
+    // The default max size of Hadoop caller context is 128
+    lazy val len = SparkHadoopUtil.get.conf.getInt("hadoop.caller.context.max.size", 128)
+    if (context == null || context.length <= len) {
+      context
+    } else {
+      val finalContext = context.substring(0, len)
+      logWarning(s"Truncated Spark caller context from $context to $finalContext")
+      finalContext
+    }
+  }
 
   /**
    * Set up the caller context [[context]] by invoking Hadoop CallerContext API of

http://git-wip-us.apache.org/repos/asf/spark/blob/3af89451/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 41c1778..ea99592 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -202,6 +202,15 @@ of the most common options to set are:
     or remotely ("cluster") on one of the nodes inside the cluster.
   </td>
 </tr>
+<tr>
+  <td><code>spark.log.callerContext</code></td>
+  <td>(none)</td>
+  <td>
+    Application information that will be written into Yarn RM log/HDFS audit log when running on Yarn/HDFS.
+    Its length depends on the Hadoop configuration <code>hadoop.caller.context.max.size</code>. It should be concise,
+    and typically can have up to 50 characters.
+  </td>
+</tr>
 </table>
 
 Apart from these, the following properties are also available, and may be useful in some situations:

http://git-wip-us.apache.org/repos/asf/spark/blob/3af89451/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f2b9dfb..918cc2d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -202,7 +202,8 @@ private[spark] class ApplicationMaster(
         attemptID = Option(appAttemptId.getAttemptId.toString)
       }
 
-      new CallerContext("APPMASTER",
+      new CallerContext(
+        "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
         Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
 
       logInfo("ApplicationAttemptId: " + appAttemptId)

http://git-wip-us.apache.org/repos/asf/spark/blob/3af89451/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e77fa38..1b75688 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -161,7 +161,8 @@ private[spark] class Client(
       reportLauncherState(SparkAppHandle.State.SUBMITTED)
       launcherBackend.setAppId(appId.toString)
 
-      new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()
+      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
+        Option(appId.toString)).setCurrentContext()
 
       // Verify whether the cluster has enough resources for our AM
       verifyClusterResources(newAppResponse)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org