You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/12/02 09:38:31 UTC

spark git commit: [SPARK-26208][SQL] add headers to empty csv files when header=true

Repository: spark
Updated Branches:
  refs/heads/master 031bd80e4 -> c7d95cced


[SPARK-26208][SQL] add headers to empty csv files when header=true

## What changes were proposed in this pull request?

Add headers to empty csv files when header=true, because otherwise these files are invalid when reading.

## How was this patch tested?

Added test for roundtrip of empty dataframe to csv file with headers and back in CSVSuite

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #23173 from koertkuipers/feat-empty-csv-with-header.

Authored-by: Koert Kuipers <ko...@tresata.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7d95cce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7d95cce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7d95cce

Branch: refs/heads/master
Commit: c7d95ccedf593edf9fda9ecaf8d0b4dda451440d
Parents: 031bd80
Author: Koert Kuipers <ko...@tresata.com>
Authored: Sun Dec 2 17:38:25 2018 +0800
Committer: Hyukjin Kwon <gu...@apache.org>
Committed: Sun Dec 2 17:38:25 2018 +0800

----------------------------------------------------------------------
 .../sql/catalyst/csv/UnivocityGenerator.scala   |  9 ++++----
 .../datasources/csv/CSVFileFormat.scala         | 22 +++++++++++++-------
 .../execution/datasources/csv/CSVSuite.scala    | 13 ++++++++++++
 3 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
index 1218f92..2ab376c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala
@@ -32,7 +32,6 @@ class UnivocityGenerator(
   private val writerSettings = options.asWriterSettings
   writerSettings.setHeaders(schema.fieldNames: _*)
   private val gen = new CsvWriter(writer, writerSettings)
-  private var printHeader = options.headerFlag
 
   // A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`.
   // When the value is null, this converter should not be called.
@@ -72,15 +71,15 @@ class UnivocityGenerator(
     values
   }
 
+  def writeHeaders(): Unit = {
+    gen.writeHeaders()
+  }
+
   /**
    * Writes a single InternalRow to CSV using Univocity.
    */
   def write(row: InternalRow): Unit = {
-    if (printHeader) {
-      gen.writeHeaders()
-    }
     gen.writeRow(convertRow(row): _*)
-    printHeader = false
   }
 
   def writeToString(row: InternalRow): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 4c5a1d3..f7d8a9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -171,15 +171,21 @@ private[csv] class CsvOutputWriter(
 
   private var univocityGenerator: Option[UnivocityGenerator] = None
 
-  override def write(row: InternalRow): Unit = {
-    val gen = univocityGenerator.getOrElse {
-      val charset = Charset.forName(params.charset)
-      val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
-      val newGen = new UnivocityGenerator(dataSchema, os, params)
-      univocityGenerator = Some(newGen)
-      newGen
-    }
+  if (params.headerFlag) {
+    val gen = getGen()
+    gen.writeHeaders()
+  }
 
+  private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse {
+    val charset = Charset.forName(params.charset)
+    val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
+    val newGen = new UnivocityGenerator(dataSchema, os, params)
+    univocityGenerator = Some(newGen)
+    newGen
+  }
+
+  override def write(row: InternalRow): Unit = {
+    val gen = getGen()
     gen.write(row)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c7d95cce/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index e14e8d4..bc950f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1987,6 +1987,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
     assert(errMsg2.contains("'lineSep' can contain only 1 character"))
   }
 
+  test("SPARK-26208: write and read empty data to csv file with headers") {
+    withTempPath { path =>
+      val df1 = spark.range(10).repartition(2).filter(_ < 0).map(_.toString).toDF
+      // we have 2 partitions but they are both empty and will be filtered out upon writing
+      // thanks to SPARK-23271 one new empty partition will be inserted
+      df1.write.format("csv").option("header", true).save(path.getAbsolutePath)
+      val df2 = spark.read.format("csv").option("header", true).option("inferSchema", false)
+        .load(path.getAbsolutePath)
+      assert(df1.schema === df2.schema)
+      checkAnswer(df1, df2)
+    }
+  }
+
   test("do not produce empty files for empty partitions") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org