You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/08/20 17:22:57 UTC

[spark] branch branch-3.0 updated: [SPARK-32621][SQL][3.0] path' option can cause issues while inferring schema in CSV/JSON datasources

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8755e3f  [SPARK-32621][SQL][3.0] path' option can cause issues while inferring schema in CSV/JSON datasources
8755e3f is described below

commit 8755e3f42f2abf8cbbf2be02dafefdab3d3a1766
Author: Terry Kim <yu...@gmail.com>
AuthorDate: Thu Aug 20 17:18:29 2020 +0000

    [SPARK-32621][SQL][3.0] path' option can cause issues while inferring schema in CSV/JSON datasources
    
    ### What changes were proposed in this pull request?
    
    This PR is backporting #29437 to branch-3.0.
    
    When CSV/JSON datasources infer schema (e.g, `def inferSchema(files: Seq[FileStatus])`, they use the `files` along with the original options. `files` in `inferSchema` could have been deduced from the "path" option if the option was present, so this can cause issues (e.g., reading more data, listing the path again) since the "path" option is **added** to the `files`.
    
    ### Why are the changes needed?
    
    The current behavior can cause the following issue:
    ```scala
    class TestFileFilter extends PathFilter {
      override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
    }
    
    val path = "/tmp"
    val df = spark.range(2)
    df.write.json(path + "/p=1")
    df.write.json(path + "/p=2")
    
    val extraOptions = Map(
      "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
      "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
    )
    
    // This works fine.
    assert(spark.read.options(extraOptions).json(path).count == 2)
    
    // The following with "path" option fails with the following:
    // assertion failed: Conflicting directory structures detected. Suspicious paths
    //	file:/tmp
    //	file:/tmp/p=1
    assert(spark.read.options(extraOptions).format("json").option("path", path).load.count() === 2)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the above failure doesn't happen and you get the consistent experience when you use `spark.read.csv(path)` or `spark.read.format("csv").option("path", path).load`.
    
    ### How was this patch tested?
    
    Updated existing tests.
    
    Closes #29478 from imback82/backport-29437.
    
    Authored-by: Terry Kim <yu...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/v2/avro/AvroDataSourceV2.scala       |  6 ++++--
 .../sql/execution/datasources/DataSource.scala     |  8 ++++++--
 .../execution/datasources/csv/CSVDataSource.scala  |  2 +-
 .../datasources/json/JsonDataSource.scala          |  2 +-
 .../datasources/v2/FileDataSourceV2.scala          |  7 +++++++
 .../datasources/v2/csv/CSVDataSourceV2.scala       |  6 ++++--
 .../datasources/v2/json/JsonDataSourceV2.scala     |  6 ++++--
 .../datasources/v2/orc/OrcDataSourceV2.scala       |  6 ++++--
 .../v2/parquet/ParquetDataSourceV2.scala           |  7 +++++--
 .../datasources/v2/text/TextDataSourceV2.scala     |  6 ++++--
 .../sql/execution/datasources/csv/CSVSuite.scala   | 16 +++++++++++++--
 .../sql/execution/datasources/json/JsonSuite.scala | 24 +++++++++++++++++++---
 12 files changed, 75 insertions(+), 21 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
index 969dee0..2d21c49 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
@@ -32,12 +32,14 @@ class AvroDataSourceV2 extends FileDataSourceV2 {
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    AvroTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    AvroTable(tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 588a9b4..e97139f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -190,9 +190,11 @@ case class DataSource(
     val dataSchema = userSpecifiedSchema.map { schema =>
       StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
     }.orElse {
+      // Remove "path" option so that it is not added to the paths returned by
+      // `tempFileIndex.allFiles()`.
       format.inferSchema(
         sparkSession,
-        caseInsensitiveOptions,
+        caseInsensitiveOptions - "path",
         tempFileIndex.allFiles())
     }.getOrElse {
       throw new AnalysisException(
@@ -366,9 +368,11 @@ case class DataSource(
         val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
           caseInsensitiveOptions, userSpecifiedSchema)
         val dataSchema = userSpecifiedSchema.orElse {
+          // Remove "path" option so that it is not added to the paths returned by
+          // `fileCatalog.allFiles()`.
           format.inferSchema(
             sparkSession,
-            caseInsensitiveOptions,
+            caseInsensitiveOptions - "path",
             fileCatalog.allFiles())
         }.getOrElse {
           throw new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 375cec5..e8005a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -156,7 +156,7 @@ object TextInputCSVDataSource extends CSVDataSource {
           sparkSession,
           paths = paths,
           className = classOf[TextFileFormat].getName,
-          options = options.parameters
+          options = options.parameters.originalMap
         ).resolveRelation(checkFilesExist = false))
         .select("value").as[String](Encoders.STRING)
     } else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 7ec2267..1d2e63c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -120,7 +120,7 @@ object TextInputJsonDataSource extends JsonDataSource {
         sparkSession,
         paths = inputPaths.map(_.getPath.toString),
         className = classOf[TextFileFormat].getName,
-        options = parsedOptions.parameters
+        options = parsedOptions.parameters.originalMap
       ).resolveRelation(checkFilesExist = false))
       .select("value").as(Encoders.STRING)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index bbe8835..fa871fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -56,6 +56,13 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
     paths ++ Option(map.get("path")).toSeq
   }
 
+  protected def getOptionsWithoutPaths(map: CaseInsensitiveStringMap): CaseInsensitiveStringMap = {
+    val withoutPath = map.asCaseSensitiveMap().asScala.filterKeys { k =>
+      !k.equalsIgnoreCase("path") && !k.equalsIgnoreCase("paths")
+    }
+    new CaseInsensitiveStringMap(withoutPath.asJava)
+  }
+
   protected def getTableName(map: CaseInsensitiveStringMap, paths: Seq[String]): String = {
     val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(
       map.asCaseSensitiveMap().asScala.toMap)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
index 69d001b..c577cbf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVDataSourceV2.scala
@@ -32,12 +32,14 @@ class CSVDataSourceV2 extends FileDataSourceV2 {
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    CSVTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    CSVTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    CSVTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    CSVTable(tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat)
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
index 9c4e3b8..cd0eba0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
@@ -32,13 +32,15 @@ class JsonDataSourceV2 extends FileDataSourceV2 {
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    JsonTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    JsonTable(tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
index fa2febd..6303723 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala
@@ -32,13 +32,15 @@ class OrcDataSourceV2 extends FileDataSourceV2 {
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    OrcTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    OrcTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    OrcTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    OrcTable(tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
index 7e7ca96..4590660 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala
@@ -32,13 +32,16 @@ class ParquetDataSourceV2 extends FileDataSourceV2 {
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    ParquetTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    ParquetTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    ParquetTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    ParquetTable(
+      tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
index 43bcb61..f375a12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
@@ -32,13 +32,15 @@ class TextDataSourceV2 extends FileDataSourceV2 {
   override def getTable(options: CaseInsensitiveStringMap): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    TextTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    TextTable(tableName, sparkSession, optionsWithoutPaths, paths, None, fallbackFileFormat)
   }
 
   override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
     val paths = getPaths(options)
     val tableName = getTableName(options, paths)
-    TextTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
+    val optionsWithoutPaths = getOptionsWithoutPaths(options)
+    TextTable(tableName, sparkSession, optionsWithoutPaths, paths, Some(schema), fallbackFileFormat)
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 366cf11..d54146a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1450,10 +1450,22 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
       val ds = sampledTestData.coalesce(1)
       ds.write.text(path.getAbsolutePath)
 
-      val readback = spark.read
+      val readback1 = spark.read
         .option("inferSchema", true).option("samplingRatio", 0.1)
         .csv(path.getCanonicalPath)
-      assert(readback.schema == new StructType().add("_c0", IntegerType))
+      assert(readback1.schema == new StructType().add("_c0", IntegerType))
+
+      withClue("SPARK-32621: 'path' option can cause issues while inferring schema") {
+        // During infer, "path" option gets added again to the paths that have already been listed.
+        // This results in reading more data than necessary and causes different schema to be
+        // inferred when sampling ratio is involved.
+        val readback2 = spark.read
+          .option("inferSchema", true).option("samplingRatio", 0.1)
+          .option("path", path.getCanonicalPath)
+          .format("csv")
+          .load
+        assert(readback2.schema == new StructType().add("_c0", IntegerType))
+      }
     })
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index d95d4e0..38b5e77 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1556,6 +1556,15 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson
         "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
       )
       assert(spark.read.options(extraOptions).json(path).count() === 2)
+
+      withClue("SPARK-32621: 'path' option can cause issues while inferring schema") {
+        // During infer, "path" option is used again on top of the paths that have already been
+        // listed. When a partition is removed by TestFileFilter, this will cause a conflict while
+        // inferring partitions because the original path in the "path" option will list the
+        // partition directory that has been removed.
+        assert(
+          spark.read.options(extraOptions).format("json").option("path", path).load.count() === 2)
+      }
     }
   }
 
@@ -2141,9 +2150,18 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson
     )(withTempPath { path =>
       val ds = sampledTestData.coalesce(1)
       ds.write.text(path.getAbsolutePath)
-      val readback = spark.read.option("samplingRatio", 0.1).json(path.getCanonicalPath)
-
-      assert(readback.schema == new StructType().add("f1", LongType))
+      val readback1 = spark.read.option("samplingRatio", 0.1).json(path.getCanonicalPath)
+      assert(readback1.schema == new StructType().add("f1", LongType))
+
+      withClue("SPARK-32621: 'path' option can cause issues while inferring schema") {
+        // During infer, "path" option gets added again to the paths that have already been listed.
+        // This results in reading more data than necessary and causes different schema to be
+        // inferred when sampling ratio is involved.
+        val readback2 = spark.read
+          .option("samplingRatio", 0.1).option("path", path.getCanonicalPath)
+          .format("json").load
+        assert(readback2.schema == new StructType().add("f1", LongType))
+      }
     })
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org