You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2016/10/07 18:34:54 UTC

spark git commit: [SPARK-17665][SPARKR] Support options/mode all for read/write APIs and options in other types

Repository: spark
Updated Branches:
  refs/heads/master bb1aaf28e -> 9d8ae853e


[SPARK-17665][SPARKR] Support options/mode all for read/write APIs and options in other types

## What changes were proposed in this pull request?

This PR includes the changes below:

  - Support `mode`/`options` in `read.parquet`, `write.parquet`, `read.orc`, `write.orc`, `read.text`, `write.text`, `read.json` and `write.json` APIs

  - Support other types (logical, numeric and string) as options for `write.df`, `read.df`, `read.parquet`, `write.parquet`, `read.orc`, `write.orc`, `read.text`, `write.text`, `read.json` and `write.json`

## How was this patch tested?

Unit tests in `test_sparkSQL.R`/ `utils.R`.

Author: hyukjinkwon <gu...@gmail.com>

Closes #15239 from HyukjinKwon/SPARK-17665.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d8ae853
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d8ae853
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d8ae853

Branch: refs/heads/master
Commit: 9d8ae853ecc5600f5c2f69565b96d5c46a8c0048
Parents: bb1aaf2
Author: hyukjinkwon <gu...@gmail.com>
Authored: Fri Oct 7 11:34:49 2016 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Fri Oct 7 11:34:49 2016 -0700

----------------------------------------------------------------------
 R/pkg/R/DataFrame.R                       | 43 ++++++++++-----
 R/pkg/R/SQLContext.R                      | 23 +++++---
 R/pkg/R/generics.R                        | 10 ++--
 R/pkg/R/utils.R                           | 22 ++++++++
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 75 ++++++++++++++++++++++++++
 R/pkg/inst/tests/testthat/test_utils.R    |  9 ++++
 6 files changed, 160 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9d8ae853/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 75861d5..801d2ed 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
   .Object
 })
 
+#' Set options/mode and then return the write object
+#' @noRd
+setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
+    options <- varargsToStrEnv(...)
+    if (!is.null(path)) {
+      options[["path"]] <- path
+    }
+    jmode <- convertToJSaveMode(mode)
+    write <- callJMethod(write, "mode", jmode)
+    write <- callJMethod(write, "options", options)
+    write
+}
+
 #' @export
 #' @param sdf A Java object reference to the backing Scala DataFrame
 #' @param isCached TRUE if the SparkDataFrame is cached
@@ -727,6 +740,8 @@ setMethod("toJSON",
 #'
 #' @param x A SparkDataFrame
 #' @param path The directory where the file is saved
+#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
+#' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
 #' @rdname write.json
@@ -743,8 +758,9 @@ setMethod("toJSON",
 #' @note write.json since 1.6.0
 setMethod("write.json",
           signature(x = "SparkDataFrame", path = "character"),
-          function(x, path) {
+          function(x, path, mode = "error", ...) {
             write <- callJMethod(x@sdf, "write")
+            write <- setWriteOptions(write, mode = mode, ...)
             invisible(callJMethod(write, "json", path))
           })
 
@@ -755,6 +771,8 @@ setMethod("write.json",
 #'
 #' @param x A SparkDataFrame
 #' @param path The directory where the file is saved
+#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
+#' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
 #' @aliases write.orc,SparkDataFrame,character-method
@@ -771,8 +789,9 @@ setMethod("write.json",
 #' @note write.orc since 2.0.0
 setMethod("write.orc",
           signature(x = "SparkDataFrame", path = "character"),
-          function(x, path) {
+          function(x, path, mode = "error", ...) {
             write <- callJMethod(x@sdf, "write")
+            write <- setWriteOptions(write, mode = mode, ...)
             invisible(callJMethod(write, "orc", path))
           })
 
@@ -783,6 +802,8 @@ setMethod("write.orc",
 #'
 #' @param x A SparkDataFrame
 #' @param path The directory where the file is saved
+#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
+#' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
 #' @rdname write.parquet
@@ -800,8 +821,9 @@ setMethod("write.orc",
 #' @note write.parquet since 1.6.0
 setMethod("write.parquet",
           signature(x = "SparkDataFrame", path = "character"),
-          function(x, path) {
+          function(x, path, mode = "error", ...) {
             write <- callJMethod(x@sdf, "write")
+            write <- setWriteOptions(write, mode = mode, ...)
             invisible(callJMethod(write, "parquet", path))
           })
 
@@ -825,6 +847,8 @@ setMethod("saveAsParquetFile",
 #'
 #' @param x A SparkDataFrame
 #' @param path The directory where the file is saved
+#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
+#' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
 #' @aliases write.text,SparkDataFrame,character-method
@@ -841,8 +865,9 @@ setMethod("saveAsParquetFile",
 #' @note write.text since 2.0.0
 setMethod("write.text",
           signature(x = "SparkDataFrame", path = "character"),
-          function(x, path) {
+          function(x, path, mode = "error", ...) {
             write <- callJMethod(x@sdf, "write")
+            write <- setWriteOptions(write, mode = mode, ...)
             invisible(callJMethod(write, "text", path))
           })
 
@@ -2637,15 +2662,9 @@ setMethod("write.df",
             if (is.null(source)) {
               source <- getDefaultSqlSource()
             }
-            jmode <- convertToJSaveMode(mode)
-            options <- varargsToEnv(...)
-            if (!is.null(path)) {
-              options[["path"]] <- path
-            }
             write <- callJMethod(df@sdf, "write")
             write <- callJMethod(write, "format", source)
-            write <- callJMethod(write, "mode", jmode)
-            write <- callJMethod(write, "options", options)
+            write <- setWriteOptions(write, path = path, mode = mode, ...)
             write <- handledCallJMethod(write, "save")
           })
 
@@ -2701,7 +2720,7 @@ setMethod("saveAsTable",
               source <- getDefaultSqlSource()
             }
             jmode <- convertToJSaveMode(mode)
-            options <- varargsToEnv(...)
+            options <- varargsToStrEnv(...)
 
             write <- callJMethod(df@sdf, "write")
             write <- callJMethod(write, "format", source)

http://git-wip-us.apache.org/repos/asf/spark/blob/9d8ae853/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index baa8782..0d6a229 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -328,6 +328,7 @@ setMethod("toDF", signature(x = "RDD"),
 #' It goes through the entire dataset once to determine the schema.
 #'
 #' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param ... additional external data source specific named properties.
 #' @return SparkDataFrame
 #' @rdname read.json
 #' @export
@@ -341,11 +342,13 @@ setMethod("toDF", signature(x = "RDD"),
 #' @name read.json
 #' @method read.json default
 #' @note read.json since 1.6.0
-read.json.default <- function(path) {
+read.json.default <- function(path, ...) {
   sparkSession <- getSparkSession()
+  options <- varargsToStrEnv(...)
   # Allow the user to have a more flexible definiton of the text file path
   paths <- as.list(suppressWarnings(normalizePath(path)))
   read <- callJMethod(sparkSession, "read")
+  read <- callJMethod(read, "options", options)
   sdf <- callJMethod(read, "json", paths)
   dataFrame(sdf)
 }
@@ -405,16 +408,19 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
 #' Loads an ORC file, returning the result as a SparkDataFrame.
 #'
 #' @param path Path of file to read.
+#' @param ... additional external data source specific named properties.
 #' @return SparkDataFrame
 #' @rdname read.orc
 #' @export
 #' @name read.orc
 #' @note read.orc since 2.0.0
-read.orc <- function(path) {
+read.orc <- function(path, ...) {
   sparkSession <- getSparkSession()
+  options <- varargsToStrEnv(...)
   # Allow the user to have a more flexible definiton of the ORC file path
   path <- suppressWarnings(normalizePath(path))
   read <- callJMethod(sparkSession, "read")
+  read <- callJMethod(read, "options", options)
   sdf <- callJMethod(read, "orc", path)
   dataFrame(sdf)
 }
@@ -430,11 +436,13 @@ read.orc <- function(path) {
 #' @name read.parquet
 #' @method read.parquet default
 #' @note read.parquet since 1.6.0
-read.parquet.default <- function(path) {
+read.parquet.default <- function(path, ...) {
   sparkSession <- getSparkSession()
+  options <- varargsToStrEnv(...)
   # Allow the user to have a more flexible definiton of the Parquet file path
   paths <- as.list(suppressWarnings(normalizePath(path)))
   read <- callJMethod(sparkSession, "read")
+  read <- callJMethod(read, "options", options)
   sdf <- callJMethod(read, "parquet", paths)
   dataFrame(sdf)
 }
@@ -467,6 +475,7 @@ parquetFile <- function(x, ...) {
 #' Each line in the text file is a new row in the resulting SparkDataFrame.
 #'
 #' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param ... additional external data source specific named properties.
 #' @return SparkDataFrame
 #' @rdname read.text
 #' @export
@@ -479,11 +488,13 @@ parquetFile <- function(x, ...) {
 #' @name read.text
 #' @method read.text default
 #' @note read.text since 1.6.1
-read.text.default <- function(path) {
+read.text.default <- function(path, ...) {
   sparkSession <- getSparkSession()
+  options <- varargsToStrEnv(...)
   # Allow the user to have a more flexible definiton of the text file path
   paths <- as.list(suppressWarnings(normalizePath(path)))
   read <- callJMethod(sparkSession, "read")
+  read <- callJMethod(read, "options", options)
   sdf <- callJMethod(read, "text", paths)
   dataFrame(sdf)
 }
@@ -779,7 +790,7 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
          "in 'spark.sql.sources.default' configuration by default.")
   }
   sparkSession <- getSparkSession()
-  options <- varargsToEnv(...)
+  options <- varargsToStrEnv(...)
   if (!is.null(path)) {
     options[["path"]] <- path
   }
@@ -842,7 +853,7 @@ loadDF <- function(x = NULL, ...) {
 #' @note createExternalTable since 1.4.0
 createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
   sparkSession <- getSparkSession()
-  options <- varargsToEnv(...)
+  options <- varargsToStrEnv(...)
   if (!is.null(path)) {
     options[["path"]] <- path
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9d8ae853/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 90a02e2..810aea9 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -651,15 +651,17 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {
 
 #' @rdname write.json
 #' @export
-setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
+setGeneric("write.json", function(x, path, ...) { standardGeneric("write.json") })
 
 #' @rdname write.orc
 #' @export
-setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") })
+setGeneric("write.orc", function(x, path, ...) { standardGeneric("write.orc") })
 
 #' @rdname write.parquet
 #' @export
-setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
+setGeneric("write.parquet", function(x, path, ...) {
+  standardGeneric("write.parquet")
+})
 
 #' @rdname write.parquet
 #' @export
@@ -667,7 +669,7 @@ setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParqu
 
 #' @rdname write.text
 #' @export
-setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
+setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })
 
 #' @rdname schema
 #' @export

http://git-wip-us.apache.org/repos/asf/spark/blob/9d8ae853/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index e696664..fa8bb0f 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -334,6 +334,28 @@ varargsToEnv <- function(...) {
   env
 }
 
+# Utility function to capture the varargs into environment object but all values are converted
+# into string.
+varargsToStrEnv <- function(...) {
+  pairs <- list(...)
+  env <- new.env()
+  for (name in names(pairs)) {
+    value <- pairs[[name]]
+    if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
+      stop(paste0("Unsupported type for ", name, " : ", class(value),
+           ". Supported types are logical, numeric, character and NULL."))
+    }
+    if (is.logical(value)) {
+      env[[name]] <- tolower(as.character(value))
+    } else if (is.null(value)) {
+      env[[name]] <- value
+    } else {
+      env[[name]] <- as.character(value)
+    }
+  }
+  env
+}
+
 getStorageLevel <- function(newLevel = c("DISK_ONLY",
                                          "DISK_ONLY_2",
                                          "MEMORY_AND_DISK",

http://git-wip-us.apache.org/repos/asf/spark/blob/9d8ae853/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index f5ab601..6d8cfad 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -256,6 +256,23 @@ test_that("read/write csv as DataFrame", {
   unlink(csvPath2)
 })
 
+test_that("Support other types for options", {
+  csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
+  mockLinesCsv <- c("year,make,model,comment,blank",
+  "\"2012\",\"Tesla\",\"S\",\"No comment\",",
+  "1997,Ford,E350,\"Go get one now they are going fast\",",
+  "2015,Chevy,Volt",
+  "NA,Dummy,Placeholder")
+  writeLines(mockLinesCsv, csvPath)
+
+  csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true")
+  expected <- read.df(csvPath, "csv", header = TRUE, inferSchema = TRUE)
+  expect_equal(collect(csvDf), collect(expected))
+
+  expect_error(read.df(csvPath, "csv", header = TRUE, maxColumns = 3))
+  unlink(csvPath)
+})
+
 test_that("convert NAs to null type in DataFrames", {
   rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
   df <- createDataFrame(rdd, list("a", "b"))
@@ -497,6 +514,19 @@ test_that("read/write json files", {
   unlink(jsonPath3)
 })
 
+test_that("read/write json files - compression option", {
+  df <- read.df(jsonPath, "json")
+
+  jsonPath <- tempfile(pattern = "jsonPath", fileext = ".json")
+  write.json(df, jsonPath, compression = "gzip")
+  jsonDF <- read.json(jsonPath)
+  expect_is(jsonDF, "SparkDataFrame")
+  expect_equal(count(jsonDF), count(df))
+  expect_true(length(list.files(jsonPath, pattern = ".gz")) > 0)
+
+  unlink(jsonPath)
+})
+
 test_that("jsonRDD() on a RDD with json string", {
   sqlContext <- suppressWarnings(sparkRSQL.init(sc))
   rdd <- parallelize(sc, mockLines)
@@ -1786,6 +1816,21 @@ test_that("read/write ORC files", {
   unsetHiveContext()
 })
 
+test_that("read/write ORC files - compression option", {
+  setHiveContext(sc)
+  df <- read.df(jsonPath, "json")
+
+  orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc")
+  write.orc(df, orcPath2, compression = "ZLIB")
+  orcDF <- read.orc(orcPath2)
+  expect_is(orcDF, "SparkDataFrame")
+  expect_equal(count(orcDF), count(df))
+  expect_true(length(list.files(orcPath2, pattern = ".zlib.orc")) > 0)
+
+  unlink(orcPath2)
+  unsetHiveContext()
+})
+
 test_that("read/write Parquet files", {
   df <- read.df(jsonPath, "json")
   # Test write.df and read.df
@@ -1817,6 +1862,23 @@ test_that("read/write Parquet files", {
   unlink(parquetPath4)
 })
 
+test_that("read/write Parquet files - compression option/mode", {
+  df <- read.df(jsonPath, "json")
+  tempPath <- tempfile(pattern = "tempPath", fileext = ".parquet")
+
+  # Test write.df and read.df
+  write.parquet(df, tempPath, compression = "GZIP")
+  df2 <- read.parquet(tempPath)
+  expect_is(df2, "SparkDataFrame")
+  expect_equal(count(df2), 3)
+  expect_true(length(list.files(tempPath, pattern = ".gz.parquet")) > 0)
+
+  write.parquet(df, tempPath, mode = "overwrite")
+  df3 <- read.parquet(tempPath)
+  expect_is(df3, "SparkDataFrame")
+  expect_equal(count(df3), 3)
+})
+
 test_that("read/write text files", {
   # Test write.df and read.df
   df <- read.df(jsonPath, "text")
@@ -1838,6 +1900,19 @@ test_that("read/write text files", {
   unlink(textPath2)
 })
 
+test_that("read/write text files - compression option", {
+  df <- read.df(jsonPath, "text")
+
+  textPath <- tempfile(pattern = "textPath", fileext = ".txt")
+  write.text(df, textPath, compression = "GZIP")
+  textDF <- read.text(textPath)
+  expect_is(textDF, "SparkDataFrame")
+  expect_equal(count(textDF), count(df))
+  expect_true(length(list.files(textPath, pattern = ".gz")) > 0)
+
+  unlink(textPath)
+})
+
 test_that("describe() and summarize() on a DataFrame", {
   df <- read.json(jsonPath)
   stats <- describe(df, "age")

http://git-wip-us.apache.org/repos/asf/spark/blob/9d8ae853/R/pkg/inst/tests/testthat/test_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R
index 69ed554..a20254e 100644
--- a/R/pkg/inst/tests/testthat/test_utils.R
+++ b/R/pkg/inst/tests/testthat/test_utils.R
@@ -217,4 +217,13 @@ test_that("rbindRaws", {
 
 })
 
+test_that("varargsToStrEnv", {
+  strenv <- varargsToStrEnv(a = 1, b = 1.1, c = TRUE, d = "abcd")
+  env <- varargsToEnv(a = "1", b = "1.1", c = "true", d = "abcd")
+  expect_equal(strenv, env)
+  expect_error(varargsToStrEnv(a = list(1, "a")),
+               paste0("Unsupported type for a : list. Supported types are logical, ",
+                      "numeric, character and NULL."))
+})
+
 sparkR.session.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org