You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/08 22:58:45 UTC

spark git commit: [SPARK-23271][SQL] Parquet output contains only _SUCCESS file after writing an empty dataframe

Repository: spark
Updated Branches:
  refs/heads/master e7bbca889 -> d90e77bd0


[SPARK-23271][SQL] Parquet output contains only _SUCCESS file after writing an empty dataframe

## What changes were proposed in this pull request?
Below are the two cases.
``` SQL
case 1

scala> List.empty[String].toDF().rdd.partitions.length
res18: Int = 1
```
When we write the above data frame as parquet, we create a parquet file containing
just the schema of the data frame.

Case 2
``` SQL

scala> val anySchema = StructType(StructField("anyName", StringType, nullable = false) :: Nil)
anySchema: org.apache.spark.sql.types.StructType = StructType(StructField(anyName,StringType,false))
scala> spark.read.schema(anySchema).csv("/tmp/empty_folder").rdd.partitions.length
res22: Int = 0
```
For the 2nd case, since number of partitions = 0, we don't call the write task (the task has logic to create the empty metadata only parquet file)

The fix is to create a dummy single partition RDD and set up the write task based on it to ensure
the metadata-only file.

## How was this patch tested?

A new test is added to DataframeReaderWriterSuite.

Author: Dilip Biswal <db...@us.ibm.com>

Closes #20525 from dilipbiswal/spark-23271.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d90e77bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d90e77bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d90e77bd

Branch: refs/heads/master
Commit: d90e77bd0ec19f8ba9198a24ec2ab3db7708eca8
Parents: e7bbca8
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Thu Mar 8 14:58:40 2018 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Mar 8 14:58:40 2018 -0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                     |  1 +
 .../execution/datasources/FileFormatWriter.scala  | 15 ++++++++++++---
 .../spark/sql/FileBasedDataSourceSuite.scala      | 18 ++++++++++++++++++
 .../sql/test/DataFrameReaderWriterSuite.scala     |  1 -
 4 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d90e77bd/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 451b814..d2132d2 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1805,6 +1805,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
 
   - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
   - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
+ - Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
 
 ## Upgrading From Spark SQL 2.2 to 2.3
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d90e77bd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 1d80a69..401597f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -190,9 +190,18 @@ object FileFormatWriter extends Logging {
           global = false,
           child = plan).execute()
       }
-      val ret = new Array[WriteTaskResult](rdd.partitions.length)
+
+      // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
+      // partition rdd to make sure we at least set up one write task to write the metadata.
+      val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) {
+        sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1)
+      } else {
+        rdd
+      }
+
+      val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
       sparkSession.sparkContext.runJob(
-        rdd,
+        rddWithNonEmptyPartitions,
         (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
           executeTask(
             description = description,
@@ -202,7 +211,7 @@ object FileFormatWriter extends Logging {
             committer,
             iterator = iter)
         },
-        0 until rdd.partitions.length,
+        rddWithNonEmptyPartitions.partitions.indices,
         (index, res: WriteTaskResult) => {
           committer.onTaskCommit(res.commitMsg)
           ret(index) = res

http://git-wip-us.apache.org/repos/asf/spark/blob/d90e77bd/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 73e3df3..bd3071b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -89,6 +90,23 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
     }
   }
 
+  Seq("orc", "parquet").foreach { format =>
+    test(s"SPARK-23271 empty RDD when saved should write a metadata only file - $format") {
+      withTempPath { outputPath =>
+        val df = spark.emptyDataFrame.select(lit(1).as("i"))
+        df.write.format(format).save(outputPath.toString)
+        val partFiles = outputPath.listFiles()
+          .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
+        assert(partFiles.length === 1)
+
+        // Now read the file.
+        val df1 = spark.read.format(format).load(outputPath.toString)
+        checkAnswer(df1, Seq.empty[Row])
+        assert(df1.schema.equals(df.schema.asNullable))
+      }
+    }
+  }
+
   allFileBasedDataSources.foreach { format =>
     test(s"SPARK-22146 read files containing special characters using $format") {
       withTempDir { dir =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d90e77bd/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 8c9bb7d..a707a88 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -301,7 +301,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
       intercept[AnalysisException] {
         spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path)
       }
-      spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org