You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/29 18:44:32 UTC

spark git commit: [SPARK-13509][SPARK-13507][SQL] Support for writing CSV with a single function call

Repository: spark
Updated Branches:
  refs/heads/master 916fc34f9 -> 02aa499df


[SPARK-13509][SPARK-13507][SQL] Support for writing CSV with a single function call

https://issues.apache.org/jira/browse/SPARK-13507
https://issues.apache.org/jira/browse/SPARK-13509

## What changes were proposed in this pull request?
This PR adds the support to write CSV data directly by a single call to the given path.

Several unitests were added for each functionality.
## How was this patch tested?

This was tested with unittests and with `dev/run_tests` for coding style

Author: hyukjinkwon <gu...@gmail.com>
Author: Hyukjin Kwon <gu...@gmail.com>

Closes #11389 from HyukjinKwon/SPARK-13507-13509.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02aa499d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02aa499d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02aa499d

Branch: refs/heads/master
Commit: 02aa499dfb71bc9571bebb79e6383842e4f48143
Parents: 916fc34
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Feb 29 09:44:29 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Feb 29 09:44:29 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                | 50 ++++++++++++++++++++
 python/test_support/sql/ages.csv                |  4 ++
 .../org/apache/spark/sql/DataFrameWriter.scala  | 23 +++++++++
 .../datasources/json/JSONOptions.scala          |  5 +-
 .../datasources/text/DefaultSource.scala        |  5 +-
 .../execution/datasources/csv/CSVSuite.scala    |  3 +-
 6 files changed, 80 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/02aa499d/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index b1453c6..7f5368d 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -233,6 +233,23 @@ class DataFrameReader(object):
             paths = [paths]
         return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
 
+    @since(2.0)
+    def csv(self, paths):
+        """Loads a CSV file and returns the result as a [[DataFrame]].
+
+        This function goes through the input once to determine the input schema. To avoid going
+        through the entire data once, specify the schema explicitly using [[schema]].
+
+        :param paths: string, or list of strings, for input path(s).
+
+        >>> df = sqlContext.read.csv('python/test_support/sql/ages.csv')
+        >>> df.dtypes
+        [('C0', 'string'), ('C1', 'string')]
+        """
+        if isinstance(paths, basestring):
+            paths = [paths]
+        return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
+
     @since(1.5)
     def orc(self, path):
         """Loads an ORC file, returning the result as a :class:`DataFrame`.
@@ -448,6 +465,11 @@ class DataFrameWriter(object):
             * ``ignore``: Silently ignore this operation if data already exists.
             * ``error`` (default case): Throw an exception if data already exists.
 
+        You can set the following JSON-specific option(s) for writing JSON files:
+            * ``compression`` (default ``None``): compression codec to use when saving to file.
+            This can be one of the known case-insensitive shorten names
+            (``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
+
         >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)._jwrite.json(path)
@@ -476,11 +498,39 @@ class DataFrameWriter(object):
     def text(self, path):
         """Saves the content of the DataFrame in a text file at the specified path.
 
+        :param path: the path in any Hadoop supported file system
+
         The DataFrame must have only one column that is of string type.
         Each row becomes a new line in the output file.
+
+        You can set the following option(s) for writing text files:
+            * ``compression`` (default ``None``): compression codec to use when saving to file.
+            This can be one of the known case-insensitive shorten names
+            (``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
         """
         self._jwrite.text(path)
 
+    @since(2.0)
+    def csv(self, path, mode=None):
+        """Saves the content of the [[DataFrame]] in CSV format at the specified path.
+
+        :param path: the path in any Hadoop supported file system
+        :param mode: specifies the behavior of the save operation when data already exists.
+
+            * ``append``: Append contents of this :class:`DataFrame` to existing data.
+            * ``overwrite``: Overwrite existing data.
+            * ``ignore``: Silently ignore this operation if data already exists.
+            * ``error`` (default case): Throw an exception if data already exists.
+
+        You can set the following CSV-specific option(s) for writing CSV files:
+            * ``compression`` (default ``None``): compression codec to use when saving to file.
+            This can be one of the known case-insensitive shorten names
+            (``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
+
+        >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
+        """
+        self.mode(mode)._jwrite.csv(path)
+
     @since(1.5)
     def orc(self, path, mode=None, partitionBy=None):
         """Saves the content of the :class:`DataFrame` in ORC format at the specified path.

http://git-wip-us.apache.org/repos/asf/spark/blob/02aa499d/python/test_support/sql/ages.csv
----------------------------------------------------------------------
diff --git a/python/test_support/sql/ages.csv b/python/test_support/sql/ages.csv
new file mode 100644
index 0000000..18991fe
--- /dev/null
+++ b/python/test_support/sql/ages.csv
@@ -0,0 +1,4 @@
+Joe,20
+Tom,30
+Hyukjin,25
+

http://git-wip-us.apache.org/repos/asf/spark/blob/02aa499d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index d6bdd3d..093504c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -453,6 +453,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    *   format("json").save(path)
    * }}}
    *
+   * You can set the following JSON-specific option(s) for writing JSON files:
+   * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
+   * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
+   *
    * @since 1.4.0
    */
   def json(path: String): Unit = format("json").save(path)
@@ -492,10 +496,29 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    *   df.write().text("/path/to/output")
    * }}}
    *
+   * You can set the following option(s) for writing text files:
+   * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
+   * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
+   *
    * @since 1.6.0
    */
   def text(path: String): Unit = format("text").save(path)
 
+  /**
+   * Saves the content of the [[DataFrame]] in CSV format at the specified path.
+   * This is equivalent to:
+   * {{{
+   *   format("csv").save(path)
+   * }}}
+   *
+   * You can set the following CSV-specific option(s) for writing CSV files:
+   * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
+   * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
+   *
+   * @since 2.0.0
+   */
+  def csv(path: String): Unit = format("csv").save(path)
+
   ///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options
   ///////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/02aa499d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index 31a95ed..e59dbd6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -48,10 +48,7 @@ private[sql] class JSONOptions(
     parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
   val allowBackslashEscapingAnyCharacter =
     parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
-  val compressionCodec = {
-    val name = parameters.get("compression").orElse(parameters.get("codec"))
-    name.map(CompressionCodecs.getCodecClassName)
-  }
+  val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
 
   /** Sets config options on a Jackson [[JsonFactory]]. */
   def setJacksonOptions(factory: JsonFactory): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/02aa499d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 60155b3..8f3f633 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -115,10 +115,7 @@ private[sql] class TextRelation(
   /** Write path. */
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
     val conf = job.getConfiguration
-    val compressionCodec = {
-      val name = parameters.get("compression").orElse(parameters.get("codec"))
-      name.map(CompressionCodecs.getCodecClassName)
-    }
+    val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
     compressionCodec.foreach { codec =>
       CompressionCodecs.setCodecConfiguration(conf, codec)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/02aa499d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 5d57d77..3ecbb14 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -268,9 +268,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         .load(testFile(carsFile))
 
       cars.coalesce(1).write
-        .format("csv")
         .option("header", "true")
-        .save(csvDir)
+        .csv(csvDir)
 
       val carsCopy = sqlContext.read
         .format("csv")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org