You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/07/25 06:17:26 UTC

spark git commit: [SPARK-19018][SQL] Add support for custom encoding on csv writer

Repository: spark
Updated Branches:
  refs/heads/master afb062753 -> 78e0a725e


[SPARK-19018][SQL] Add support for custom encoding on csv writer

## What changes were proposed in this pull request?

Add support for custom encoding on csv writer, see https://issues.apache.org/jira/browse/SPARK-19018

## How was this patch tested?

Added two unit tests in CSVSuite

Author: crafty-coder <ca...@gmail.com>
Author: Carlos <cr...@users.noreply.github.com>

Closes #20949 from crafty-coder/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78e0a725
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78e0a725
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78e0a725

Branch: refs/heads/master
Commit: 78e0a725e06665cf92d4b8f987ee01947a1d620c
Parents: afb0627
Author: crafty-coder <ca...@gmail.com>
Authored: Wed Jul 25 14:17:20 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Wed Jul 25 14:17:20 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  7 +++-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  2 +
 .../datasources/csv/CSVFileFormat.scala         |  6 ++-
 .../execution/datasources/csv/CSVSuite.scala    | 39 +++++++++++++++++++-
 4 files changed, 50 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78e0a725/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 3efe2ad..98b2cd9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -859,7 +859,7 @@ class DataFrameWriter(OptionUtils):
     def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
             header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
             timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
-            charToEscapeQuoteEscaping=None):
+            charToEscapeQuoteEscaping=None, encoding=None):
         """Saves the content of the :class:`DataFrame` in CSV format at the specified path.
 
         :param path: the path in any Hadoop supported file system
@@ -909,6 +909,8 @@ class DataFrameWriter(OptionUtils):
                                           the quote character. If None is set, the default value is
                                           escape character when escape and quote characters are
                                           different, ``\0`` otherwise..
+        :param encoding: sets the encoding (charset) of saved csv files. If None is set,
+                         the default UTF-8 charset will be used.
 
         >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
         """
@@ -918,7 +920,8 @@ class DataFrameWriter(OptionUtils):
                        dateFormat=dateFormat, timestampFormat=timestampFormat,
                        ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
                        ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
-                       charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
+                       charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
+                       encoding=encoding)
         self._jwrite.csv(path)
 
     @since(1.5)

http://git-wip-us.apache.org/repos/asf/spark/blob/78e0a725/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 90bea2d..b9fa43f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -629,6 +629,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * enclosed in quotes. Default is to only escape values containing a quote character.</li>
    * <li>`header` (default `false`): writes the names of columns as the first line.</li>
    * <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
+   * <li>`encoding` (by default it is not set): specifies encoding (charset) of saved csv
+   * files. If it is not set, the UTF-8 charset will be used.</li>
    * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
    * one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
    * `snappy` and `deflate`). </li>

http://git-wip-us.apache.org/repos/asf/spark/blob/78e0a725/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index aeb40e5..d59b982 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.csv
 
+import java.nio.charset.Charset
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce._
@@ -168,7 +170,9 @@ private[csv] class CsvOutputWriter(
     context: TaskAttemptContext,
     params: CSVOptions) extends OutputWriter with Logging {
 
-  private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path))
+  private val charset = Charset.forName(params.charset)
+
+  private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
 
   private val gen = new UnivocityGenerator(dataSchema, writer, params)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78e0a725/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 63cc598..456b453 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.io.File
-import java.nio.charset.UnsupportedCharsetException
+import java.nio.charset.{Charset, UnsupportedCharsetException}
+import java.nio.file.Files
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.util.Locale
 
 import scala.collection.JavaConverters._
+import scala.util.Properties
 
 import org.apache.commons.lang3.time.FastDateFormat
 import org.apache.hadoop.io.SequenceFile.CompressionType
@@ -514,6 +516,41 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
     }
   }
 
+  test("SPARK-19018: Save csv with custom charset") {
+
+    // scalastyle:off nonascii
+    val content = "µß áâä ÁÂÄ"
+    // scalastyle:on nonascii
+
+    Seq("iso-8859-1", "utf-8", "utf-16", "utf-32", "windows-1250").foreach { encoding =>
+      withTempPath { path =>
+        val csvDir = new File(path, "csv")
+        Seq(content).toDF().write
+          .option("encoding", encoding)
+          .csv(csvDir.getCanonicalPath)
+
+        csvDir.listFiles().filter(_.getName.endsWith("csv")).foreach({ csvFile =>
+          val readback = Files.readAllBytes(csvFile.toPath)
+          val expected = (content + Properties.lineSeparator).getBytes(Charset.forName(encoding))
+          assert(readback === expected)
+        })
+      }
+    }
+  }
+
+  test("SPARK-19018: error handling for unsupported charsets") {
+    val exception = intercept[SparkException] {
+      withTempPath { path =>
+        val csvDir = new File(path, "csv").getCanonicalPath
+        Seq("a,A,c,A,b,B").toDF().write
+          .option("encoding", "1-9588-osi")
+          .csv(csvDir)
+      }
+    }
+
+    assert(exception.getCause.getMessage.contains("1-9588-osi"))
+  }
+
   test("commented lines in CSV data") {
     Seq("false", "true").foreach { multiLine =>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org