You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/08 22:58:45 UTC
spark git commit: [SPARK-23271][SQL] Parquet output contains only
_SUCCESS file after writing an empty dataframe
Repository: spark
Updated Branches:
refs/heads/master e7bbca889 -> d90e77bd0
[SPARK-23271][SQL] Parquet output contains only _SUCCESS file after writing an empty dataframe
## What changes were proposed in this pull request?
Below are the two cases.
``` SQL
case 1
scala> List.empty[String].toDF().rdd.partitions.length
res18: Int = 1
```
When we write the above data frame as parquet, we create a parquet file containing
just the schema of the data frame.
Case 2
``` SQL
scala> val anySchema = StructType(StructField("anyName", StringType, nullable = false) :: Nil)
anySchema: org.apache.spark.sql.types.StructType = StructType(StructField(anyName,StringType,false))
scala> spark.read.schema(anySchema).csv("/tmp/empty_folder").rdd.partitions.length
res22: Int = 0
```
For the 2nd case, since number of partitions = 0, we don't call the write task (the task has logic to create the empty metadata only parquet file)
The fix is to create a dummy single partition RDD and set up the write task based on it to ensure
the metadata-only file.
## How was this patch tested?
A new test is added to DataframeReaderWriterSuite.
Author: Dilip Biswal <db...@us.ibm.com>
Closes #20525 from dilipbiswal/spark-23271.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d90e77bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d90e77bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d90e77bd
Branch: refs/heads/master
Commit: d90e77bd0ec19f8ba9198a24ec2ab3db7708eca8
Parents: e7bbca8
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Thu Mar 8 14:58:40 2018 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Mar 8 14:58:40 2018 -0800
----------------------------------------------------------------------
docs/sql-programming-guide.md | 1 +
.../execution/datasources/FileFormatWriter.scala | 15 ++++++++++++---
.../spark/sql/FileBasedDataSourceSuite.scala | 18 ++++++++++++++++++
.../sql/test/DataFrameReaderWriterSuite.scala | 1 -
4 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d90e77bd/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 451b814..d2132d2 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1805,6 +1805,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unabled to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
+ - Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
## Upgrading From Spark SQL 2.2 to 2.3
http://git-wip-us.apache.org/repos/asf/spark/blob/d90e77bd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 1d80a69..401597f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -190,9 +190,18 @@ object FileFormatWriter extends Logging {
global = false,
child = plan).execute()
}
- val ret = new Array[WriteTaskResult](rdd.partitions.length)
+
+ // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
+ // partition rdd to make sure we at least set up one write task to write the metadata.
+ val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) {
+ sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1)
+ } else {
+ rdd
+ }
+
+ val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
sparkSession.sparkContext.runJob(
- rdd,
+ rddWithNonEmptyPartitions,
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
executeTask(
description = description,
@@ -202,7 +211,7 @@ object FileFormatWriter extends Logging {
committer,
iterator = iter)
},
- 0 until rdd.partitions.length,
+ rddWithNonEmptyPartitions.partitions.indices,
(index, res: WriteTaskResult) => {
committer.onTaskCommit(res.commitMsg)
ret(index) = res
http://git-wip-us.apache.org/repos/asf/spark/blob/d90e77bd/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 73e3df3..bd3071b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkException
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -89,6 +90,23 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}
+ Seq("orc", "parquet").foreach { format =>
+ test(s"SPARK-23271 empty RDD when saved should write a metadata only file - $format") {
+ withTempPath { outputPath =>
+ val df = spark.emptyDataFrame.select(lit(1).as("i"))
+ df.write.format(format).save(outputPath.toString)
+ val partFiles = outputPath.listFiles()
+ .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
+ assert(partFiles.length === 1)
+
+ // Now read the file.
+ val df1 = spark.read.format(format).load(outputPath.toString)
+ checkAnswer(df1, Seq.empty[Row])
+ assert(df1.schema.equals(df.schema.asNullable))
+ }
+ }
+ }
+
allFileBasedDataSources.foreach { format =>
test(s"SPARK-22146 read files containing special characters using $format") {
withTempDir { dir =>
http://git-wip-us.apache.org/repos/asf/spark/blob/d90e77bd/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 8c9bb7d..a707a88 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -301,7 +301,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
intercept[AnalysisException] {
spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path)
}
- spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org