You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/12/02 09:38:31 UTC
spark git commit: [SPARK-26208][SQL] add headers to empty csv files
when header=true
Repository: spark
Updated Branches:
refs/heads/master 031bd80e4 -> c7d95cced
[SPARK-26208][SQL] add headers to empty csv files when header=true
## What changes were proposed in this pull request?
Add headers to empty csv files when header=true, because otherwise these files are invalid when reading.
## How was this patch tested?
Added test for roundtrip of empty dataframe to csv file with headers and back in CSVSuite
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes #23173 from koertkuipers/feat-empty-csv-with-header.
Authored-by: Koert Kuipers <ko...@tresata.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7d95cce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7d95cce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7d95cce
Branch: refs/heads/master
Commit: c7d95ccedf593edf9fda9ecaf8d0b4dda451440d
Parents: 031bd80
Author: Koert Kuipers <ko...@tresata.com>
Authored: Sun Dec 2 17:38:25 2018 +0800
Committer: Hyukjin Kwon <gu...@apache.org>
Committed: Sun Dec 2 17:38:25 2018 +0800
----------------------------------------------------------------------
.../sql/catalyst/csv/UnivocityGenerator.scala | 9 ++++----
.../datasources/csv/CSVFileFormat.scala | 22 +++++++++++++-------
.../execution/datasources/csv/CSVSuite.scala | 13 ++++++++++++
3 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index 1218f92..2ab376c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -32,7 +32,6 @@ class UnivocityGenerator(
private val writerSettings = options.asWriterSettings
writerSettings.setHeaders(schema.fieldNames: _*)
private val gen = new CsvWriter(writer, writerSettings)
- private var printHeader = options.headerFlag
// A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`.
// When the value is null, this converter should not be called.
@@ -72,15 +71,15 @@ class UnivocityGenerator(
values
}
+ def writeHeaders(): Unit = {
+ gen.writeHeaders()
+ }
+
/**
* Writes a single InternalRow to CSV using Univocity.
*/
def write(row: InternalRow): Unit = {
- if (printHeader) {
- gen.writeHeaders()
- }
gen.writeRow(convertRow(row): _*)
- printHeader = false
}
def writeToString(row: InternalRow): String = {
http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 4c5a1d3..f7d8a9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -171,15 +171,21 @@ private[csv] class CsvOutputWriter(
private var univocityGenerator: Option[UnivocityGenerator] = None
- override def write(row: InternalRow): Unit = {
- val gen = univocityGenerator.getOrElse {
- val charset = Charset.forName(params.charset)
- val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
- val newGen = new UnivocityGenerator(dataSchema, os, params)
- univocityGenerator = Some(newGen)
- newGen
- }
+ if (params.headerFlag) {
+ val gen = getGen()
+ gen.writeHeaders()
+ }
+ private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse {
+ val charset = Charset.forName(params.charset)
+ val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
+ val newGen = new UnivocityGenerator(dataSchema, os, params)
+ univocityGenerator = Some(newGen)
+ newGen
+ }
+
+ override def write(row: InternalRow): Unit = {
+ val gen = getGen()
gen.write(row)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index e14e8d4..bc950f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1987,6 +1987,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(errMsg2.contains("'lineSep' can contain only 1 character"))
}
+ test("SPARK-26208: write and read empty data to csv file with headers") {
+ withTempPath { path =>
+ val df1 = spark.range(10).repartition(2).filter(_ < 0).map(_.toString).toDF
+ // we have 2 partitions but they are both empty and will be filtered out upon writing
+ // thanks to SPARK-23271 one new empty partition will be inserted
+ df1.write.format("csv").option("header", true).save(path.getAbsolutePath)
+ val df2 = spark.read.format("csv").option("header", true).option("inferSchema", false)
+ .load(path.getAbsolutePath)
+ assert(df1.schema === df2.schema)
+ checkAnswer(df1, df2)
+ }
+ }
+
test("do not produce empty files for empty partitions") {
withTempPath { dir =>
val path = dir.getCanonicalPath
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org