You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/02/26 22:50:24 UTC
[spark] branch master updated: [SPARK-22860][CORE][YARN] Redact
command line arguments for running Driver and Executor before logging
(standalone and YARN)
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c17150a [SPARK-22860][CORE][YARN] Redact command line arguments for running Driver and Executor before logging (standalone and YARN)
c17150a is described below
commit c17150a5f5a6c4f4a83ce8c055eab9fea78df08e
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Tue Feb 26 14:49:46 2019 -0800
[SPARK-22860][CORE][YARN] Redact command line arguments for running Driver and Executor before logging (standalone and YARN)
## What changes were proposed in this pull request?
This patch applies redaction to command line arguments before logging them. This applies to two resource managers: standalone cluster and YARN.
This patch only concerns about arguments starting with `-D` since Spark is likely passing the Spark configuration to command line arguments as `-Dspark.blabla=blabla`. More change is necessary if we also want to handle the case of `--conf spark.blabla=blabla`.
## How was this patch tested?
Added UT for redact logic. This patch only touches how to log so not easy to add UT regarding it.
Closes #23820 from HeartSaVioR/MINOR-redact-command-line-args-for-running-driver-executor.
Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../apache/spark/deploy/worker/DriverRunner.scala | 9 ++--
.../spark/deploy/worker/ExecutorRunner.scala | 9 ++--
.../main/scala/org/apache/spark/util/Utils.scala | 13 ++++++
.../scala/org/apache/spark/util/UtilsSuite.scala | 52 ++++++++++++++++++++++
.../spark/deploy/yarn/ExecutorRunnable.scala | 2 +-
5 files changed, 77 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 8c2a907..0c88119 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -193,8 +193,9 @@ private[deploy] class DriverRunner(
CommandUtils.redirectStream(process.getInputStream, stdout)
val stderr = new File(baseDir, "stderr")
- val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
- val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
+ val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala)
+ .mkString("\"", "\" \"", "\"")
+ val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40)
Files.append(header, stderr, StandardCharsets.UTF_8)
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
@@ -210,8 +211,10 @@ private[deploy] class DriverRunner(
val successfulRunDuration = 5
var keepTrying = !killed
+ val redactedCommand = Utils.redactCommandLineArgs(conf, command.command)
+ .mkString("\"", "\" \"", "\"")
while (keepTrying) {
- logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
+ logInfo("Launch Command: " + redactedCommand)
synchronized {
if (killed) { return exitCode }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index c74a957..ead28f1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import com.google.common.io.Files
import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
+import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI._
@@ -150,8 +150,9 @@ private[deploy] class ExecutorRunner(
val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
- val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
- logInfo(s"Launch command: $formattedCommand")
+ val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala)
+ .mkString("\"", "\" \"", "\"")
+ logInfo(s"Launch command: $redactedCommand")
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
@@ -171,7 +172,7 @@ private[deploy] class ExecutorRunner(
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
- formattedCommand, "=" * 40)
+ redactedCommand, "=" * 40)
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3065bdf..cade0dd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -98,6 +98,8 @@ private[spark] object Utils extends Logging {
/** Scheme used for files that are locally available on worker nodes in the cluster. */
val LOCAL_SCHEME = "local"
+ private val PATTERN_FOR_COMMAND_LINE_ARG = "-D(.+?)=(.+)".r
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -2617,6 +2619,17 @@ private[spark] object Utils extends Logging {
redact(redactionPattern, kvs.toArray)
}
+ def redactCommandLineArgs(conf: SparkConf, commands: Seq[String]): Seq[String] = {
+ val redactionPattern = conf.get(SECRET_REDACTION_PATTERN)
+ commands.map {
+ case PATTERN_FOR_COMMAND_LINE_ARG(key, value) =>
+ val (_, newValue) = redact(redactionPattern, Seq((key, value))).head
+ s"-D$key=$newValue"
+
+ case cmd => cmd
+ }
+ }
+
def stringToSeq(str: String): Seq[String] = {
str.split(",").map(_.trim()).filter(_.nonEmpty)
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 6c2159e..fdd9771 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -1016,6 +1016,58 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
}
+ test("redact sensitive information in command line args") {
+ val sparkConf = new SparkConf
+
+ // Set some secret keys
+ val secretKeysWithSameValue = Seq(
+ "spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
+ "spark.my.password",
+ "spark.my.sECreT")
+ val cmdArgsForSecretWithSameValue = secretKeysWithSameValue.map(s => s"-D$s=sensitive_value")
+
+ val secretKeys = secretKeysWithSameValue ++ Seq("spark.your.password")
+ val cmdArgsForSecret = cmdArgsForSecretWithSameValue ++ Seq(
+ // Have '=' twice
+ "-Dspark.your.password=sensitive=sensitive2"
+ )
+
+ val ignoredArgs = Seq(
+ // starts with -D but no assignment
+ "-Ddummy",
+ // secret value contained not starting with -D (we don't care about this case for now)
+ "spark.my.password=sensitive_value",
+ // edge case: not started with -D, but matched pattern after first '-'
+ "--Dspark.my.password=sensitive_value")
+
+ val cmdArgs = cmdArgsForSecret ++ ignoredArgs ++ Seq(
+ // Set a non-secret key
+ "-Dspark.regular.property=regular_value",
+ // Set a property with a regular key but secret in the value
+ "-Dspark.sensitive.property=has_secret_in_value")
+
+ // Redact sensitive information
+ val redactedCmdArgs = Utils.redactCommandLineArgs(sparkConf, cmdArgs)
+
+ // These arguments should be left as they were:
+ // 1) argument without -D option is not applied
+ // 2) -D option without key-value assignment is not applied
+ assert(ignoredArgs.forall(redactedCmdArgs.contains))
+
+ val redactedCmdArgMap = redactedCmdArgs.filterNot(ignoredArgs.contains).map { cmd =>
+ val keyValue = cmd.substring("-D".length).split("=")
+ keyValue(0) -> keyValue.tail.mkString("=")
+ }.toMap
+
+ // Assert that secret information got redacted while the regular property remained the same
+ secretKeys.foreach { key =>
+ assert(redactedCmdArgMap(key) === Utils.REDACTION_REPLACEMENT_TEXT)
+ }
+
+ assert(redactedCmdArgMap("spark.regular.property") === "regular_value")
+ assert(redactedCmdArgMap("spark.sensitive.property") === Utils.REDACTION_REPLACEMENT_TEXT)
+ }
+
test("tryWithSafeFinally") {
var e = new Error("Block0")
val finallyBlockError = new Error("Finally Block")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 2f8f2a0..7046ad7 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -76,7 +76,7 @@ private[yarn] class ExecutorRunnable(
| env:
|${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString}
| command:
- | ${commands.mkString(" \\ \n ")}
+ | ${Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n ")}
|
| resources:
|${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org