You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/02/26 22:50:24 UTC

[spark] branch master updated: [SPARK-22860][CORE][YARN] Redact command line arguments for running Driver and Executor before logging (standalone and YARN)

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c17150a  [SPARK-22860][CORE][YARN] Redact command line arguments for running Driver and Executor before logging (standalone and YARN)
c17150a is described below

commit c17150a5f5a6c4f4a83ce8c055eab9fea78df08e
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Tue Feb 26 14:49:46 2019 -0800

    [SPARK-22860][CORE][YARN] Redact command line arguments for running Driver and Executor before logging (standalone and YARN)
    
    ## What changes were proposed in this pull request?
    
    This patch applies redaction to command line arguments before logging them. This applies to two resource managers: standalone cluster and YARN.
    
    This patch only concerns about arguments starting with `-D` since Spark is likely passing the Spark configuration to command line arguments as `-Dspark.blabla=blabla`. More change is necessary if we also want to handle the case of `--conf spark.blabla=blabla`.
    
    ## How was this patch tested?
    
    Added UT for redact logic. This patch only touches how to log so not easy to add UT regarding it.
    
    Closes #23820 from HeartSaVioR/MINOR-redact-command-line-args-for-running-driver-executor.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../apache/spark/deploy/worker/DriverRunner.scala  |  9 ++--
 .../spark/deploy/worker/ExecutorRunner.scala       |  9 ++--
 .../main/scala/org/apache/spark/util/Utils.scala   | 13 ++++++
 .../scala/org/apache/spark/util/UtilsSuite.scala   | 52 ++++++++++++++++++++++
 .../spark/deploy/yarn/ExecutorRunnable.scala       |  2 +-
 5 files changed, 77 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 8c2a907..0c88119 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -193,8 +193,9 @@ private[deploy] class DriverRunner(
       CommandUtils.redirectStream(process.getInputStream, stdout)
 
       val stderr = new File(baseDir, "stderr")
-      val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
-      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
+      val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala)
+        .mkString("\"", "\" \"", "\"")
+      val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40)
       Files.append(header, stderr, StandardCharsets.UTF_8)
       CommandUtils.redirectStream(process.getErrorStream, stderr)
     }
@@ -210,8 +211,10 @@ private[deploy] class DriverRunner(
     val successfulRunDuration = 5
     var keepTrying = !killed
 
+    val redactedCommand = Utils.redactCommandLineArgs(conf, command.command)
+      .mkString("\"", "\" \"", "\"")
     while (keepTrying) {
-      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
+      logInfo("Launch Command: " + redactedCommand)
 
       synchronized {
         if (killed) { return exitCode }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index c74a957..ead28f1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 import com.google.common.io.Files
 
 import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.UI._
@@ -150,8 +150,9 @@ private[deploy] class ExecutorRunner(
       val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
         memory, sparkHome.getAbsolutePath, substituteVariables)
       val command = builder.command()
-      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
-      logInfo(s"Launch command: $formattedCommand")
+      val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala)
+        .mkString("\"", "\" \"", "\"")
+      logInfo(s"Launch command: $redactedCommand")
 
       builder.directory(executorDir)
       builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
@@ -171,7 +172,7 @@ private[deploy] class ExecutorRunner(
 
       process = builder.start()
       val header = "Spark Executor Command: %s\n%s\n\n".format(
-        formattedCommand, "=" * 40)
+        redactedCommand, "=" * 40)
 
       // Redirect its stdout and stderr to files
       val stdout = new File(executorDir, "stdout")
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3065bdf..cade0dd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -98,6 +98,8 @@ private[spark] object Utils extends Logging {
   /** Scheme used for files that are locally available on worker nodes in the cluster. */
   val LOCAL_SCHEME = "local"
 
+  private val PATTERN_FOR_COMMAND_LINE_ARG = "-D(.+?)=(.+)".r
+
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -2617,6 +2619,17 @@ private[spark] object Utils extends Logging {
     redact(redactionPattern, kvs.toArray)
   }
 
+  def redactCommandLineArgs(conf: SparkConf, commands: Seq[String]): Seq[String] = {
+    val redactionPattern = conf.get(SECRET_REDACTION_PATTERN)
+    commands.map {
+      case PATTERN_FOR_COMMAND_LINE_ARG(key, value) =>
+        val (_, newValue) = redact(redactionPattern, Seq((key, value))).head
+        s"-D$key=$newValue"
+
+      case cmd => cmd
+    }
+  }
+
   def stringToSeq(str: String): Seq[String] = {
     str.split(",").map(_.trim()).filter(_.nonEmpty)
   }
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 6c2159e..fdd9771 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -1016,6 +1016,58 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
 
   }
 
+  test("redact sensitive information in command line args") {
+    val sparkConf = new SparkConf
+
+    // Set some secret keys
+    val secretKeysWithSameValue = Seq(
+      "spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
+      "spark.my.password",
+      "spark.my.sECreT")
+    val cmdArgsForSecretWithSameValue = secretKeysWithSameValue.map(s => s"-D$s=sensitive_value")
+
+    val secretKeys = secretKeysWithSameValue ++ Seq("spark.your.password")
+    val cmdArgsForSecret = cmdArgsForSecretWithSameValue ++ Seq(
+      // Have '=' twice
+      "-Dspark.your.password=sensitive=sensitive2"
+    )
+
+    val ignoredArgs = Seq(
+      // starts with -D but no assignment
+      "-Ddummy",
+      // secret value contained not starting with -D (we don't care about this case for now)
+      "spark.my.password=sensitive_value",
+      // edge case: not started with -D, but matched pattern after first '-'
+      "--Dspark.my.password=sensitive_value")
+
+    val cmdArgs = cmdArgsForSecret ++ ignoredArgs ++ Seq(
+      // Set a non-secret key
+      "-Dspark.regular.property=regular_value",
+      // Set a property with a regular key but secret in the value
+      "-Dspark.sensitive.property=has_secret_in_value")
+
+    // Redact sensitive information
+    val redactedCmdArgs = Utils.redactCommandLineArgs(sparkConf, cmdArgs)
+
+    // These arguments should be left as they were:
+    // 1) argument without -D option is not applied
+    // 2) -D option without key-value assignment is not applied
+    assert(ignoredArgs.forall(redactedCmdArgs.contains))
+
+    val redactedCmdArgMap = redactedCmdArgs.filterNot(ignoredArgs.contains).map { cmd =>
+      val keyValue = cmd.substring("-D".length).split("=")
+      keyValue(0) -> keyValue.tail.mkString("=")
+    }.toMap
+
+    // Assert that secret information got redacted while the regular property remained the same
+    secretKeys.foreach { key =>
+      assert(redactedCmdArgMap(key) === Utils.REDACTION_REPLACEMENT_TEXT)
+    }
+
+    assert(redactedCmdArgMap("spark.regular.property") === "regular_value")
+    assert(redactedCmdArgMap("spark.sensitive.property") === Utils.REDACTION_REPLACEMENT_TEXT)
+  }
+
   test("tryWithSafeFinally") {
     var e = new Error("Block0")
     val finallyBlockError = new Error("Finally Block")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 2f8f2a0..7046ad7 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -76,7 +76,7 @@ private[yarn] class ExecutorRunnable(
     |  env:
     |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s"    $k -> $v\n" }.mkString}
     |  command:
-    |    ${commands.mkString(" \\ \n      ")}
+    |    ${Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n      ")}
     |
     |  resources:
     |${localResources.map { case (k, v) => s"    $k -> $v\n" }.mkString}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org