You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/09/08 00:51:39 UTC
[spark] branch branch-2.4 updated: [SPARK-32810][SQL][2.4] CSV/JSON
data sources should avoid globbing paths when inferring schema
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new bc471f3 [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema
bc471f3 is described below
commit bc471f3ec76618409fc9cc5791ddd2e3beca9c5c
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue Sep 8 09:45:17 2020 +0900
[SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema
### What changes were proposed in this pull request?
In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true:
* no user specified schema
* some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc.
### Why are the changes needed?
To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`:
```scala
spark.read.csv("""/tmp/\[abc\].csv""").show
spark.read.json("""/tmp/\[abc\].json""").show
```
but would end up hitting an exception:
```
org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv;
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:392)
```
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added new test cases in `DataFrameReaderWriterSuite`.
Closes #29663 from MaxGekk/globbing-paths-when-inferring-schema-2.4.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../sql/execution/datasources/DataSource.scala | 27 ++++++++++++++++++----
.../execution/datasources/csv/CSVDataSource.scala | 2 +-
.../datasources/json/JsonDataSource.scala | 2 +-
.../sql/test/DataFrameReaderWriterSuite.scala | 23 ++++++++++++++++++
4 files changed, 48 insertions(+), 6 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ff5fe09..31c91f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -65,8 +65,9 @@ import org.apache.spark.util.Utils
* metadata. For example, when reading a partitioned table from a file system, partition columns
* will be inferred from the directory layout even if they are not specified.
*
- * @param paths A list of file system paths that hold data. These will be globbed before and
- * qualified. This option only works when reading from a [[FileFormat]].
+ * @param paths A list of file system paths that hold data. These will be globbed before if
+ * the "__globPaths__" option is true, and will be qualified. This option only works
+ * when reading from a [[FileFormat]].
* @param userSpecifiedSchema An optional specification of the schema of the data. When present
* we skip attempting to infer the schema.
* @param partitionColumns A list of column names that the relation is partitioned by. This list is
@@ -97,6 +98,15 @@ case class DataSource(
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
+ /**
+ * Whether or not paths should be globbed before being used to access files.
+ */
+ def globPaths: Boolean = {
+ options.get(DataSource.GLOB_PATHS_KEY)
+ .map(_ == "true")
+ .getOrElse(true)
+ }
+
bucketSpec.map { bucket =>
SchemaUtils.checkColumnNameDuplication(
bucket.bucketColumnNames, "in the bucket definition", equality)
@@ -223,7 +233,7 @@ case class DataSource(
// For glob pattern, we do not check it because the glob pattern might only make sense
// once the streaming job starts and some upstream source starts dropping data.
val hdfsPath = new Path(path)
- if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
+ if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
if (!fs.exists(hdfsPath)) {
throw new AnalysisException(s"Path does not exist: $path")
@@ -550,7 +560,11 @@ case class DataSource(
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+ val globPath = if (globPaths) {
+ SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+ } else {
+ qualified :: Nil
+ }
if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
@@ -741,4 +755,9 @@ object DataSource extends Logging {
""".stripMargin)
}
}
+
+ /**
+ * The key in the "options" map for deciding whether or not to glob paths before use.
+ */
+ val GLOB_PATHS_KEY = "__globPaths__"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 62c3c16..ee408c2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -276,7 +276,7 @@ object TextInputCSVDataSource extends CSVDataSource {
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
- options = options.parameters
+ options = options.parameters ++ Map(DataSource.GLOB_PATHS_KEY -> "false")
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index d6c5888..a7a3cc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -119,7 +119,7 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession,
paths = inputPaths.map(_.getPath.toString),
className = classOf[TextFileFormat].getName,
- options = parsedOptions.parameters
+ options = parsedOptions.parameters.originalMap ++ Map(DataSource.GLOB_PATHS_KEY -> "false")
).resolveRelation(checkFilesExist = false))
.select("value").as(Encoders.STRING)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 2b5b227..6b1a8e1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -946,4 +946,27 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}
}
+
+ test("SPARK-32810: CSV and JSON data sources should be able to read files with " +
+ "escaped glob metacharacter in the paths") {
+ def escape(str: String): String = {
+ """(\[|\]|\{|\})""".r.replaceAllIn(str, """\\$1""")
+ }
+
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+
+ // test CSV writer / reader without specifying schema
+ val csvTableName = "[abc]"
+ spark.range(3).coalesce(1).write.csv(s"$basePath/$csvTableName")
+ val csvDf = spark.read.csv(s"$basePath/${escape(csvTableName)}")
+ assert(csvDf.collect sameElements Array(Row("0"), Row("1"), Row("2")))
+
+ // test JSON writer / reader without specifying schema
+ val jsonTableName = "{def}"
+ spark.range(3).coalesce(1).write.json(s"$basePath/$jsonTableName")
+ val jsonDf = spark.read.json(s"$basePath/${escape(jsonTableName)}")
+ assert(jsonDf.collect sameElements Array(Row(0), Row(1), Row(2)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org