You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/12/16 19:34:33 UTC

spark git commit: [SPARK-12310][SPARKR] Add write.json and write.parquet for SparkR

Repository: spark
Updated Branches:
  refs/heads/master 2eb5af5f0 -> 22f6cd86f


[SPARK-12310][SPARKR] Add write.json and write.parquet for SparkR

Add ```write.json``` and ```write.parquet``` for SparkR, and deprecated ```saveAsParquetFile```.

Author: Yanbo Liang <yb...@gmail.com>

Closes #10281 from yanboliang/spark-12310.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22f6cd86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22f6cd86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22f6cd86

Branch: refs/heads/master
Commit: 22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1
Parents: 2eb5af5
Author: Yanbo Liang <yb...@gmail.com>
Authored: Wed Dec 16 10:34:30 2015 -0800
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Wed Dec 16 10:34:30 2015 -0800

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |   4 +-
 R/pkg/R/DataFrame.R                       |  51 ++++++++++--
 R/pkg/R/generics.R                        |  16 +++-
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 104 ++++++++++++++-----------
 4 files changed, 119 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/22f6cd86/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index cab39d6..ccc01fe 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -92,7 +92,9 @@ exportMethods("arrange",
               "with",
               "withColumn",
               "withColumnRenamed",
-              "write.df")
+              "write.df",
+              "write.json",
+              "write.parquet")
 
 exportClasses("Column")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22f6cd86/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 380a13f..0cfa12b9 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -596,17 +596,44 @@ setMethod("toJSON",
             RDD(jrdd, serializedMode = "string")
           })
 
-#' saveAsParquetFile
+#' write.json
+#'
+#' Save the contents of a DataFrame as a JSON file (one object per line). Files written out
+#' with this method can be read back in as a DataFrame using read.json().
+#'
+#' @param x A SparkSQL DataFrame
+#' @param path The directory where the file is saved
+#'
+#' @family DataFrame functions
+#' @rdname write.json
+#' @name write.json
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- read.json(sqlContext, path)
+#' write.json(df, "/tmp/sparkr-tmp/")
+#'}
+setMethod("write.json",
+          signature(x = "DataFrame", path = "character"),
+          function(x, path) {
+            write <- callJMethod(x@sdf, "write")
+            invisible(callJMethod(write, "json", path))
+          })
+
+#' write.parquet
 #'
 #' Save the contents of a DataFrame as a Parquet file, preserving the schema. Files written out
-#' with this method can be read back in as a DataFrame using parquetFile().
+#' with this method can be read back in as a DataFrame using read.parquet().
 #'
 #' @param x A SparkSQL DataFrame
 #' @param path The directory where the file is saved
 #'
 #' @family DataFrame functions
-#' @rdname saveAsParquetFile
-#' @name saveAsParquetFile
+#' @rdname write.parquet
+#' @name write.parquet
 #' @export
 #' @examples
 #'\dontrun{
@@ -614,12 +641,24 @@ setMethod("toJSON",
 #' sqlContext <- sparkRSQL.init(sc)
 #' path <- "path/to/file.json"
 #' df <- read.json(sqlContext, path)
-#' saveAsParquetFile(df, "/tmp/sparkr-tmp/")
+#' write.parquet(df, "/tmp/sparkr-tmp1/")
+#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/")
 #'}
+setMethod("write.parquet",
+          signature(x = "DataFrame", path = "character"),
+          function(x, path) {
+            write <- callJMethod(x@sdf, "write")
+            invisible(callJMethod(write, "parquet", path))
+          })
+
+#' @rdname write.parquet
+#' @name saveAsParquetFile
+#' @export
 setMethod("saveAsParquetFile",
           signature(x = "DataFrame", path = "character"),
           function(x, path) {
-            invisible(callJMethod(x@sdf, "saveAsParquetFile", path))
+            .Deprecated("write.parquet")
+            write.parquet(x, path)
           })
 
 #' Distinct

http://git-wip-us.apache.org/repos/asf/spark/blob/22f6cd86/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index c383e6e..62be2dd 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -519,10 +519,6 @@ setGeneric("sample_frac",
 #' @export
 setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("sampleBy") })
 
-#' @rdname saveAsParquetFile
-#' @export
-setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
-
 #' @rdname saveAsTable
 #' @export
 setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
@@ -541,6 +537,18 @@ setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
 #' @export
 setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })
 
+#' @rdname write.json
+#' @export
+setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
+
+#' @rdname write.parquet
+#' @export
+setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
+
+#' @rdname write.parquet
+#' @export
+setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
+
 #' @rdname schema
 #' @export
 setGeneric("schema", function(x) { standardGeneric("schema") })

http://git-wip-us.apache.org/repos/asf/spark/blob/22f6cd86/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 071fd31..135c757 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -371,22 +371,49 @@ test_that("Collect DataFrame with complex types", {
   expect_equal(bob$height, 176.5)
 })
 
-test_that("read.json()/jsonFile() on a local file returns a DataFrame", {
+test_that("read/write json files", {
+  # Test read.df
+  df <- read.df(sqlContext, jsonPath, "json")
+  expect_is(df, "DataFrame")
+  expect_equal(count(df), 3)
+
+  # Test read.df with a user defined schema
+  schema <- structType(structField("name", type = "string"),
+                       structField("age", type = "double"))
+
+  df1 <- read.df(sqlContext, jsonPath, "json", schema)
+  expect_is(df1, "DataFrame")
+  expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
+
+  # Test loadDF
+  df2 <- loadDF(sqlContext, jsonPath, "json", schema)
+  expect_is(df2, "DataFrame")
+  expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
+
+  # Test read.json
   df <- read.json(sqlContext, jsonPath)
   expect_is(df, "DataFrame")
   expect_equal(count(df), 3)
-  # read.json()/jsonFile() works with multiple input paths
+
+  # Test write.df
   jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json")
   write.df(df, jsonPath2, "json", mode="overwrite")
-  jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2))
+
+  # Test write.json
+  jsonPath3 <- tempfile(pattern="jsonPath3", fileext=".json")
+  write.json(df, jsonPath3)
+
+  # Test read.json()/jsonFile() works with multiple input paths
+  jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3))
   expect_is(jsonDF1, "DataFrame")
   expect_equal(count(jsonDF1), 6)
   # Suppress warnings because jsonFile is deprecated
-  jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2)))
+  jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3)))
   expect_is(jsonDF2, "DataFrame")
   expect_equal(count(jsonDF2), 6)
 
   unlink(jsonPath2)
+  unlink(jsonPath3)
 })
 
 test_that("jsonRDD() on a RDD with json string", {
@@ -454,6 +481,9 @@ test_that("insertInto() on a registered table", {
   expect_equal(count(sql(sqlContext, "select * from table1")), 2)
   expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob")
   dropTempTable(sqlContext, "table1")
+
+  unlink(jsonPath2)
+  unlink(parquetPath2)
 })
 
 test_that("table() returns a new DataFrame", {
@@ -848,33 +878,6 @@ test_that("column calculation", {
   expect_equal(count(df2), 3)
 })
 
-test_that("read.df() from json file", {
-  df <- read.df(sqlContext, jsonPath, "json")
-  expect_is(df, "DataFrame")
-  expect_equal(count(df), 3)
-
-  # Check if we can apply a user defined schema
-  schema <- structType(structField("name", type = "string"),
-                       structField("age", type = "double"))
-
-  df1 <- read.df(sqlContext, jsonPath, "json", schema)
-  expect_is(df1, "DataFrame")
-  expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
-
-  # Run the same with loadDF
-  df2 <- loadDF(sqlContext, jsonPath, "json", schema)
-  expect_is(df2, "DataFrame")
-  expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
-})
-
-test_that("write.df() as parquet file", {
-  df <- read.df(sqlContext, jsonPath, "json")
-  write.df(df, parquetPath, "parquet", mode="overwrite")
-  df2 <- read.df(sqlContext, parquetPath, "parquet")
-  expect_is(df2, "DataFrame")
-  expect_equal(count(df2), 3)
-})
-
 test_that("test HiveContext", {
   ssc <- callJMethod(sc, "sc")
   hiveCtx <- tryCatch({
@@ -895,6 +898,8 @@ test_that("test HiveContext", {
   df3 <- sql(hiveCtx, "select * from json2")
   expect_is(df3, "DataFrame")
   expect_equal(count(df3), 3)
+
+  unlink(jsonPath2)
 })
 
 test_that("column operators", {
@@ -1333,6 +1338,9 @@ test_that("join() and merge() on a DataFrame", {
   expect_error(merge(df, df3),
                paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
                      "Please use different suffixes for the intersected columns.", sep = ""))
+
+  unlink(jsonPath2)
+  unlink(jsonPath3)
 })
 
 test_that("toJSON() returns an RDD of the correct values", {
@@ -1396,6 +1404,8 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
 
   # Test base::intersect is working
   expect_equal(length(intersect(1:20, 3:23)), 18)
+
+  unlink(jsonPath2)
 })
 
 test_that("withColumn() and withColumnRenamed()", {
@@ -1440,31 +1450,35 @@ test_that("mutate(), transform(), rename() and names()", {
   detach(airquality)
 })
 
-test_that("write.df() on DataFrame and works with read.parquet", {
-  df <- read.json(sqlContext, jsonPath)
+test_that("read/write Parquet files", {
+  df <- read.df(sqlContext, jsonPath, "json")
+  # Test write.df and read.df
   write.df(df, parquetPath, "parquet", mode="overwrite")
-  parquetDF <- read.parquet(sqlContext, parquetPath)
-  expect_is(parquetDF, "DataFrame")
-  expect_equal(count(df), count(parquetDF))
-})
+  df2 <- read.df(sqlContext, parquetPath, "parquet")
+  expect_is(df2, "DataFrame")
+  expect_equal(count(df2), 3)
 
-test_that("read.parquet()/parquetFile() works with multiple input paths", {
-  df <- read.json(sqlContext, jsonPath)
-  write.df(df, parquetPath, "parquet", mode="overwrite")
+  # Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
   parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
-  write.df(df, parquetPath2, "parquet", mode="overwrite")
-  parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2))
+  write.parquet(df, parquetPath2)
+  parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
+  suppressWarnings(saveAsParquetFile(df, parquetPath3))
+  parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3))
   expect_is(parquetDF, "DataFrame")
   expect_equal(count(parquetDF), count(df) * 2)
-  parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2))
+  parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3))
   expect_is(parquetDF2, "DataFrame")
   expect_equal(count(parquetDF2), count(df) * 2)
 
   # Test if varargs works with variables
   saveMode <- "overwrite"
   mergeSchema <- "true"
-  parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
-  write.df(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
+  parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
+  write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)
+
+  unlink(parquetPath2)
+  unlink(parquetPath3)
+  unlink(parquetPath4)
 })
 
 test_that("describe() and summarize() on a DataFrame", {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org