You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/09/08 00:51:39 UTC

[spark] branch branch-2.4 updated: [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new bc471f3  [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema
bc471f3 is described below

commit bc471f3ec76618409fc9cc5791ddd2e3beca9c5c
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue Sep 8 09:45:17 2020 +0900

    [SPARK-32810][SQL][2.4] CSV/JSON data sources should avoid globbing paths when inferring schema
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to fix an issue with the CSV and JSON data sources in Spark SQL when both of the following are true:
    * no user specified schema
    * some file paths contain escaped glob metacharacters, such as `[``]`, `{``}`, `*` etc.
    
    ### Why are the changes needed?
    To fix the issue when the follow two queries try to read from paths `[abc].csv` and `[abc].json`:
    ```scala
    spark.read.csv("""/tmp/\[abc\].csv""").show
    spark.read.json("""/tmp/\[abc\].json""").show
    ```
    but would end up hitting an exception:
    ```
    org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/[abc].csv;
      at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:722)
      at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
      at scala.collection.immutable.List.foreach(List.scala:392)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    Added new test cases in `DataFrameReaderWriterSuite`.
    
    Closes #29663 from MaxGekk/globbing-paths-when-inferring-schema-2.4.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../sql/execution/datasources/DataSource.scala     | 27 ++++++++++++++++++----
 .../execution/datasources/csv/CSVDataSource.scala  |  2 +-
 .../datasources/json/JsonDataSource.scala          |  2 +-
 .../sql/test/DataFrameReaderWriterSuite.scala      | 23 ++++++++++++++++++
 4 files changed, 48 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ff5fe09..31c91f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -65,8 +65,9 @@ import org.apache.spark.util.Utils
  * metadata.  For example, when reading a partitioned table from a file system, partition columns
  * will be inferred from the directory layout even if they are not specified.
  *
- * @param paths A list of file system paths that hold data.  These will be globbed before and
- *              qualified. This option only works when reading from a [[FileFormat]].
+ * @param paths A list of file system paths that hold data. These will be globbed before if
+ *              the "__globPaths__" option is true, and will be qualified. This option only works
+ *              when reading from a [[FileFormat]].
  * @param userSpecifiedSchema An optional specification of the schema of the data. When present
  *                            we skip attempting to infer the schema.
  * @param partitionColumns A list of column names that the relation is partitioned by. This list is
@@ -97,6 +98,15 @@ case class DataSource(
   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
   private val equality = sparkSession.sessionState.conf.resolver
 
+  /**
+   * Whether or not paths should be globbed before being used to access files.
+   */
+  def globPaths: Boolean = {
+    options.get(DataSource.GLOB_PATHS_KEY)
+      .map(_ == "true")
+      .getOrElse(true)
+  }
+
   bucketSpec.map { bucket =>
     SchemaUtils.checkColumnNameDuplication(
       bucket.bucketColumnNames, "in the bucket definition", equality)
@@ -223,7 +233,7 @@ case class DataSource(
         // For glob pattern, we do not check it because the glob pattern might only make sense
         // once the streaming job starts and some upstream source starts dropping data.
         val hdfsPath = new Path(path)
-        if (!SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
+        if (!globPaths || !SparkHadoopUtil.get.isGlobPath(hdfsPath)) {
           val fs = hdfsPath.getFileSystem(newHadoopConfiguration())
           if (!fs.exists(hdfsPath)) {
             throw new AnalysisException(s"Path does not exist: $path")
@@ -550,7 +560,11 @@ case class DataSource(
       val hdfsPath = new Path(path)
       val fs = hdfsPath.getFileSystem(hadoopConf)
       val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-      val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+      val globPath = if (globPaths) {
+        SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+      } else {
+        qualified :: Nil
+      }
 
       if (checkEmptyGlobPath && globPath.isEmpty) {
         throw new AnalysisException(s"Path does not exist: $qualified")
@@ -741,4 +755,9 @@ object DataSource extends Logging {
          """.stripMargin)
     }
   }
+
+  /**
+   * The key in the "options" map for deciding whether or not to glob paths before use.
+   */
+  val GLOB_PATHS_KEY = "__globPaths__"
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 62c3c16..ee408c2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -276,7 +276,7 @@ object TextInputCSVDataSource extends CSVDataSource {
           sparkSession,
           paths = paths,
           className = classOf[TextFileFormat].getName,
-          options = options.parameters
+          options = options.parameters ++ Map(DataSource.GLOB_PATHS_KEY -> "false")
         ).resolveRelation(checkFilesExist = false))
         .select("value").as[String](Encoders.STRING)
     } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index d6c5888..a7a3cc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -119,7 +119,7 @@ object TextInputJsonDataSource extends JsonDataSource {
         sparkSession,
         paths = inputPaths.map(_.getPath.toString),
         className = classOf[TextFileFormat].getName,
-        options = parsedOptions.parameters
+        options = parsedOptions.parameters.originalMap ++ Map(DataSource.GLOB_PATHS_KEY -> "false")
       ).resolveRelation(checkFilesExist = false))
       .select("value").as(Encoders.STRING)
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 2b5b227..6b1a8e1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -946,4 +946,27 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
       }
     }
   }
+
+  test("SPARK-32810: CSV and JSON data sources should be able to read files with " +
+    "escaped glob metacharacter in the paths") {
+    def escape(str: String): String = {
+      """(\[|\]|\{|\})""".r.replaceAllIn(str, """\\$1""")
+    }
+
+    withTempDir { dir =>
+      val basePath = dir.getCanonicalPath
+
+      // test CSV writer / reader without specifying schema
+      val csvTableName = "[abc]"
+      spark.range(3).coalesce(1).write.csv(s"$basePath/$csvTableName")
+      val csvDf = spark.read.csv(s"$basePath/${escape(csvTableName)}")
+      assert(csvDf.collect sameElements Array(Row("0"), Row("1"), Row("2")))
+
+      // test JSON writer / reader without specifying schema
+      val jsonTableName = "{def}"
+      spark.range(3).coalesce(1).write.json(s"$basePath/$jsonTableName")
+      val jsonDf = spark.read.json(s"$basePath/${escape(jsonTableName)}")
+      assert(jsonDf.collect sameElements Array(Row(0), Row(1), Row(2)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org