You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/03/24 22:52:54 UTC

spark git commit: [SPARK-20070][SQL] Redact DataSourceScanExec treeString

Repository: spark
Updated Branches:
  refs/heads/master e8810b73c -> 91fa80fe8


[SPARK-20070][SQL] Redact DataSourceScanExec treeString

## What changes were proposed in this pull request?
The explain output of `DataSourceScanExec` can contain sensitive information (like Amazon keys). Such information should not end up in logs, or be exposed to non privileged users.

This PR addresses this by adding a redaction facility for the `DataSourceScanExec.treeString`. A user can enable this by setting a regex in the `spark.redaction.string.regex` configuration.

## How was this patch tested?
Added a unit test to check the output of DataSourceScanExec.

Author: Herman van Hovell <hv...@databricks.com>

Closes #17397 from hvanhovell/SPARK-20070.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91fa80fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91fa80fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91fa80fe

Branch: refs/heads/master
Commit: 91fa80fe8a2480d64c430bd10f97b3d44c007bcc
Parents: e8810b7
Author: Herman van Hovell <hv...@databricks.com>
Authored: Fri Mar 24 15:52:48 2017 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Fri Mar 24 15:52:48 2017 -0700

----------------------------------------------------------------------
 .../spark/internal/config/ConfigBuilder.scala   | 13 +++++
 .../apache/spark/internal/config/package.scala  | 12 +++-
 .../scala/org/apache/spark/util/Utils.scala     | 17 +++++-
 .../internal/config/ConfigEntrySuite.scala      | 19 +++++--
 .../sql/execution/DataSourceScanExec.scala      | 41 +++++++------
 .../DataSourceScanExecRedactionSuite.scala      | 60 ++++++++++++++++++++
 6 files changed, 138 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/91fa80fe/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index a177e66..d87619a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -18,6 +18,9 @@
 package org.apache.spark.internal.config
 
 import java.util.concurrent.TimeUnit
+import java.util.regex.PatternSyntaxException
+
+import scala.util.matching.Regex
 
 import org.apache.spark.network.util.{ByteUnit, JavaUtils}
 
@@ -65,6 +68,13 @@ private object ConfigHelpers {
 
   def byteToString(v: Long, unit: ByteUnit): String = unit.convertTo(v, ByteUnit.BYTE) + "b"
 
+  def regexFromString(str: String, key: String): Regex = {
+    try str.r catch {
+      case e: PatternSyntaxException =>
+        throw new IllegalArgumentException(s"$key should be a regex, but was $str", e)
+    }
+  }
+
 }
 
 /**
@@ -214,4 +224,7 @@ private[spark] case class ConfigBuilder(key: String) {
     new FallbackConfigEntry(key, _doc, _public, fallback)
   }
 
+  def regexConf: TypedConfigBuilder[Regex] = {
+    new TypedConfigBuilder(this, regexFromString(_, this.key), _.regex)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/91fa80fe/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 223c921..89aeea4 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -246,8 +246,16 @@ package object config {
         "driver and executor environments contain sensitive information. When this regex matches " +
         "a property, its value is redacted from the environment UI and various logs like YARN " +
         "and event logs.")
-      .stringConf
-      .createWithDefault("(?i)secret|password")
+      .regexConf
+      .createWithDefault("(?i)secret|password".r)
+
+  private[spark] val STRING_REDACTION_PATTERN =
+    ConfigBuilder("spark.redaction.string.regex")
+      .doc("Regex to decide which parts of strings produced by Spark contain sensitive " +
+        "information. When this regex matches a string part, that string part is replaced by a " +
+        "dummy value. This is currently used to redact the output of SQL explain commands.")
+      .regexConf
+      .createOptional
 
   private[spark] val NETWORK_AUTH_ENABLED =
     ConfigBuilder("spark.authenticate")

http://git-wip-us.apache.org/repos/asf/spark/blob/91fa80fe/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1af34e3..943dde0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2585,13 +2585,26 @@ private[spark] object Utils extends Logging {
     }
   }
 
-  private[util] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"
+  private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"
 
+  /**
+   * Redact the sensitive values in the given map. If a map key matches the redaction pattern then
+   * its value is replaced with a dummy text.
+   */
   def redact(conf: SparkConf, kvs: Seq[(String, String)]): Seq[(String, String)] = {
-    val redactionPattern = conf.get(SECRET_REDACTION_PATTERN).r
+    val redactionPattern = conf.get(SECRET_REDACTION_PATTERN)
     redact(redactionPattern, kvs)
   }
 
+  /**
+   * Redact the sensitive information in the given string.
+   */
+  def redact(conf: SparkConf, text: String): String = {
+    if (text == null || text.isEmpty || !conf.contains(STRING_REDACTION_PATTERN)) return text
+    val regex = conf.get(STRING_REDACTION_PATTERN).get
+    regex.replaceAllIn(text, REDACTION_REPLACEMENT_TEXT)
+  }
+
   private def redact(redactionPattern: Regex, kvs: Seq[(String, String)]): Seq[(String, String)] = {
     kvs.map { kv =>
       redactionPattern.findFirstIn(kv._1)

http://git-wip-us.apache.org/repos/asf/spark/blob/91fa80fe/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index 71eed46..f3756b2 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -19,9 +19,6 @@ package org.apache.spark.internal.config
 
 import java.util.concurrent.TimeUnit
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable.HashMap
-
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.util.SparkConfWithEnv
@@ -98,6 +95,21 @@ class ConfigEntrySuite extends SparkFunSuite {
     assert(conf.get(bytes) === 1L)
   }
 
+  test("conf entry: regex") {
+    val conf = new SparkConf()
+    val rConf = ConfigBuilder(testKey("regex")).regexConf.createWithDefault(".*".r)
+
+    conf.set(rConf, "[0-9a-f]{8}".r)
+    assert(conf.get(rConf).regex === "[0-9a-f]{8}")
+
+    conf.set(rConf.key, "[0-9a-f]{4}")
+    assert(conf.get(rConf).regex === "[0-9a-f]{4}")
+
+    conf.set(rConf.key, "[.")
+    val e = intercept[IllegalArgumentException](conf.get(rConf))
+    assert(e.getMessage.contains("regex should be a regex, but was"))
+  }
+
   test("conf entry: string seq") {
     val conf = new SparkConf()
     val seq = ConfigBuilder(testKey("seq")).stringConf.toSequence.createWithDefault(Seq())
@@ -239,5 +251,4 @@ class ConfigEntrySuite extends SparkFunSuite {
       .createWithDefault(null)
     testEntryRef(nullConf, ref(nullConf))
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/91fa80fe/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index bfe9c8e..28156b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -41,9 +41,33 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
   val relation: BaseRelation
   val metastoreTableIdentifier: Option[TableIdentifier]
 
+  protected val nodeNamePrefix: String = ""
+
   override val nodeName: String = {
     s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
   }
+
+  override def simpleString: String = {
+    val metadataEntries = metadata.toSeq.sorted.map {
+      case (key, value) =>
+        key + ": " + StringUtils.abbreviate(redact(value), 100)
+    }
+    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+    s"$nodeNamePrefix$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
+  }
+
+  override def verboseString: String = redact(super.verboseString)
+
+  override def treeString(verbose: Boolean, addSuffix: Boolean): String = {
+    redact(super.treeString(verbose, addSuffix))
+  }
+
+  /**
+   * Shorthand for calling redactString() without specifying redacting rules
+   */
+  private def redact(text: String): String = {
+    Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text)
+  }
 }
 
 /** Physical plan node for scanning data from a relation. */
@@ -85,15 +109,6 @@ case class RowDataSourceScanExec(
     }
   }
 
-  override def simpleString: String = {
-    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
-      key + ": " + StringUtils.abbreviate(value, 100)
-    }
-
-    s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" +
-      s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}"
-  }
-
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
     rdd :: Nil
   }
@@ -307,13 +322,7 @@ case class FileSourceScanExec(
     }
   }
 
-  override def simpleString: String = {
-    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
-      key + ": " + StringUtils.abbreviate(value, 100)
-    }
-    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
-    s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
-  }
+  override val nodeNamePrefix: String = "File"
 
   override protected def doProduce(ctx: CodegenContext): String = {
     if (supportsBatch) {

http://git-wip-us.apache.org/repos/asf/spark/blob/91fa80fe/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
new file mode 100644
index 0000000..986fa87
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+/**
+ * Suite that tests the redaction of DataSourceScanExec
+ */
+class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
+
+  import Utils._
+
+  override def beforeAll(): Unit = {
+    sparkConf.set("spark.redaction.string.regex",
+      "spark-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
+    super.beforeAll()
+  }
+
+  test("treeString is redacted") {
+    withTempDir { dir =>
+      val basePath = dir.getCanonicalPath
+      spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
+      val df = spark.read.parquet(basePath)
+
+      val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+        .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head
+      assert(rootPath.toString.contains(basePath.toString))
+
+      assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName))
+      assert(!df.queryExecution.executedPlan.treeString(verbose = true).contains(rootPath.getName))
+      assert(!df.queryExecution.toString.contains(rootPath.getName))
+      assert(!df.queryExecution.simpleString.contains(rootPath.getName))
+
+      val replacement = "*********"
+      assert(df.queryExecution.sparkPlan.treeString(verbose = true).contains(replacement))
+      assert(df.queryExecution.executedPlan.treeString(verbose = true).contains(replacement))
+      assert(df.queryExecution.toString.contains(replacement))
+      assert(df.queryExecution.simpleString.contains(replacement))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org