You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/29 20:32:55 UTC
[spark] branch master updated: [SPARK-27849][SQL] Redact treeString
of FileTable and DataSourceV2ScanExecBase
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c1007c2 [SPARK-27849][SQL] Redact treeString of FileTable and DataSourceV2ScanExecBase
c1007c2 is described below
commit c1007c2f7c4fafd1568e08d45164af0956c445b7
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Wed May 29 13:32:21 2019 -0700
[SPARK-27849][SQL] Redact treeString of FileTable and DataSourceV2ScanExecBase
## What changes were proposed in this pull request?
To follow https://github.com/apache/spark/pull/17397, the output of FileTable and DataSourceV2ScanExecBase can contain sensitive information (like Amazon keys). Such information should not end up in logs, or be exposed to non-privileged users.
This PR is to add a redaction facility for these outputs to resolve the issue. A user can enable this by setting a regex in the same spark.redaction.string.regex configuration as V1.
## How was this patch tested?
Unit test
Closes #24719 from gengliangwang/RedactionSuite.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../datasources/v2/DataSourceV2ScanExecBase.scala | 5 +-
.../datasources/v2/FileDataSourceV2.scala | 11 ++-
.../sql/execution/datasources/v2/FileScan.scala | 17 ++++
.../sql/execution/datasources/v2/orc/OrcScan.scala | 4 +
.../DataSourceScanExecRedactionSuite.scala | 96 ++++++++++++++++++----
5 files changed, 114 insertions(+), 19 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index da71e78..9ad683f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning}
+import org.apache.spark.util.Utils
trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
@@ -35,7 +36,9 @@ trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
def readerFactory: PartitionReaderFactory
override def simpleString(maxFields: Int): String = {
- s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}"
+ val result =
+ s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}"
+ Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, result)
}
override def outputPartitioning: physical.Partitioning = scan match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index d60d429..bcb10ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -17,12 +17,14 @@
package org.apache.spark.sql.execution.datasources.v2
import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.Utils
/**
* A base interface for data source v2 implementations of the built-in file-based data sources.
@@ -49,6 +51,13 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
}
protected def getTableName(paths: Seq[String]): String = {
- shortName() + ":" + paths.mkString(";")
+ val name = shortName() + " " + paths.map(qualifiedPathName).mkString(",")
+ Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, name)
+ }
+
+ private def qualifiedPathName(path: String): String = {
+ val hdfsPath = new Path(path)
+ val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index 84a1274..b2f3c4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2
import java.util.{Locale, OptionalLong}
+import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -29,6 +30,7 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.Utils
abstract class FileScan(
sparkSession: SparkSession,
@@ -42,6 +44,21 @@ abstract class FileScan(
false
}
+ override def description(): String = {
+ val locationDesc =
+ fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]")
+ val metadata: Map[String, String] = Map(
+ "ReadSchema" -> readDataSchema.catalogString,
+ "Location" -> locationDesc)
+ val metadataStr = metadata.toSeq.sorted.map {
+ case (key, value) =>
+ val redactedValue =
+ Utils.redact(sparkSession.sessionState.conf.stringRedactionPattern, value)
+ key + ": " + StringUtils.abbreviate(redactedValue, 100)
+ }.mkString(", ")
+ s"${this.getClass.getSimpleName} $metadataStr"
+ }
+
protected def partitions: Seq[FilePartition] = {
val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty)
val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index b129c94..a4fb034 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -58,4 +58,8 @@ case class OrcScan(
}
override def hashCode(): Int = getClass.hashCode()
+
+ override def description(): String = {
+ super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]")
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
index 11a1c9a..ec59459 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
@@ -19,26 +19,35 @@ package org.apache.spark.sql.execution
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{DataFrame, QueryTest}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
/**
- * Suite that tests the redaction of DataSourceScanExec
+ * Test suite base for testing the redaction of DataSourceScanExec/BatchScanExec.
*/
-class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
+abstract class DataSourceScanRedactionTest extends QueryTest with SharedSQLContext {
override protected def sparkConf: SparkConf = super.sparkConf
- .set("spark.redaction.string.regex", "file:/[\\w_]+")
+ .set("spark.redaction.string.regex", "file:/[\\w-_@/]+")
+
+ final protected def isIncluded(queryExecution: QueryExecution, msg: String): Boolean = {
+ queryExecution.toString.contains(msg) ||
+ queryExecution.simpleString.contains(msg) ||
+ queryExecution.stringWithStats.contains(msg)
+ }
+
+ protected def getRootPath(df: DataFrame): Path
test("treeString is redacted") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
- spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
- val df = spark.read.parquet(basePath)
+ spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString)
+ val df = spark.read.orc(basePath)
- val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
- .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head
+ val rootPath = getRootPath(df)
assert(rootPath.toString.contains(dir.toURI.getPath.stripSuffix("/")))
assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName))
@@ -53,18 +62,24 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
assert(df.queryExecution.simpleString.contains(replacement))
}
}
+}
- private def isIncluded(queryExecution: QueryExecution, msg: String): Boolean = {
- queryExecution.toString.contains(msg) ||
- queryExecution.simpleString.contains(msg) ||
- queryExecution.stringWithStats.contains(msg)
- }
+/**
+ * Suite that tests the redaction of DataSourceScanExec
+ */
+class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest {
+ override protected def sparkConf: SparkConf = super.sparkConf
+ .set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "orc")
+
+ override protected def getRootPath(df: DataFrame): Path =
+ df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+ .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head
test("explain is redacted using SQLConf") {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
- spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
- val df = spark.read.parquet(basePath)
+ spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString)
+ val df = spark.read.orc(basePath)
val replacement = "*********"
// Respect SparkConf and replace file:/
@@ -86,8 +101,8 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
test("FileSourceScanExec metadata") {
withTempPath { path =>
val dir = path.getCanonicalPath
- spark.range(0, 10).write.parquet(dir)
- val df = spark.read.parquet(dir)
+ spark.range(0, 10).write.orc(dir)
+ val df = spark.read.orc(dir)
assert(isIncluded(df.queryExecution, "Format"))
assert(isIncluded(df.queryExecution, "ReadSchema"))
@@ -98,5 +113,52 @@ class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
assert(isIncluded(df.queryExecution, "Location"))
}
}
+}
+
+/**
+ * Suite that tests the redaction of BatchScanExec.
+ */
+class DataSourceV2ScanExecRedactionSuite extends DataSourceScanRedactionTest {
+ override protected def sparkConf: SparkConf = super.sparkConf
+ .set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "")
+
+ override protected def getRootPath(df: DataFrame): Path =
+ df.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get
+ .asInstanceOf[BatchScanExec].scan.asInstanceOf[OrcScan].fileIndex.rootPaths.head
+
+ test("explain is redacted using SQLConf") {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(0, 10).toDF("a").write.orc(new Path(basePath, "foo=1").toString)
+ val df = spark.read.orc(basePath)
+ val replacement = "*********"
+
+ // Respect SparkConf and replace file:/
+ assert(isIncluded(df.queryExecution, replacement))
+ assert(isIncluded(df.queryExecution, "BatchScan"))
+ assert(!isIncluded(df.queryExecution, "file:/"))
+
+ withSQLConf(SQLConf.SQL_STRING_REDACTION_PATTERN.key -> "(?i)BatchScan") {
+ // Respect SQLConf and replace FileScan
+ assert(isIncluded(df.queryExecution, replacement))
+
+ assert(!isIncluded(df.queryExecution, "BatchScan"))
+ assert(isIncluded(df.queryExecution, "file:/"))
+ }
+ }
+ }
+
+ test("FileScan description") {
+ withTempPath { path =>
+ val dir = path.getCanonicalPath
+ spark.range(0, 10).write.orc(dir)
+ val df = spark.read.orc(dir)
+
+ assert(isIncluded(df.queryExecution, "ReadSchema"))
+ assert(isIncluded(df.queryExecution, "BatchScan"))
+ assert(isIncluded(df.queryExecution, "PushedFilters"))
+ assert(isIncluded(df.queryExecution, "Location"))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org