You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/12/16 19:34:33 UTC
spark git commit: [SPARK-12310][SPARKR] Add write.json and
write.parquet for SparkR
Repository: spark
Updated Branches:
refs/heads/master 2eb5af5f0 -> 22f6cd86f
[SPARK-12310][SPARKR] Add write.json and write.parquet for SparkR
Add ```write.json``` and ```write.parquet``` for SparkR, and deprecated ```saveAsParquetFile```.
Author: Yanbo Liang <yb...@gmail.com>
Closes #10281 from yanboliang/spark-12310.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22f6cd86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22f6cd86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22f6cd86
Branch: refs/heads/master
Commit: 22f6cd86fc2e2d6f6ad2c3aae416732c46ebf1b1
Parents: 2eb5af5
Author: Yanbo Liang <yb...@gmail.com>
Authored: Wed Dec 16 10:34:30 2015 -0800
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Wed Dec 16 10:34:30 2015 -0800
----------------------------------------------------------------------
R/pkg/NAMESPACE | 4 +-
R/pkg/R/DataFrame.R | 51 ++++++++++--
R/pkg/R/generics.R | 16 +++-
R/pkg/inst/tests/testthat/test_sparkSQL.R | 104 ++++++++++++++-----------
4 files changed, 119 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/22f6cd86/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index cab39d6..ccc01fe 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -92,7 +92,9 @@ exportMethods("arrange",
"with",
"withColumn",
"withColumnRenamed",
- "write.df")
+ "write.df",
+ "write.json",
+ "write.parquet")
exportClasses("Column")
http://git-wip-us.apache.org/repos/asf/spark/blob/22f6cd86/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 380a13f..0cfa12b9 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -596,17 +596,44 @@ setMethod("toJSON",
RDD(jrdd, serializedMode = "string")
})
-#' saveAsParquetFile
+#' write.json
+#'
+#' Save the contents of a DataFrame as a JSON file (one object per line). Files written out
+#' with this method can be read back in as a DataFrame using read.json().
+#'
+#' @param x A SparkSQL DataFrame
+#' @param path The directory where the file is saved
+#'
+#' @family DataFrame functions
+#' @rdname write.json
+#' @name write.json
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlContext <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- read.json(sqlContext, path)
+#' write.json(df, "/tmp/sparkr-tmp/")
+#'}
+setMethod("write.json",
+ signature(x = "DataFrame", path = "character"),
+ function(x, path) {
+ write <- callJMethod(x@sdf, "write")
+ invisible(callJMethod(write, "json", path))
+ })
+
+#' write.parquet
#'
#' Save the contents of a DataFrame as a Parquet file, preserving the schema. Files written out
-#' with this method can be read back in as a DataFrame using parquetFile().
+#' with this method can be read back in as a DataFrame using read.parquet().
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
-#' @rdname saveAsParquetFile
-#' @name saveAsParquetFile
+#' @rdname write.parquet
+#' @name write.parquet
#' @export
#' @examples
#'\dontrun{
@@ -614,12 +641,24 @@ setMethod("toJSON",
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
-#' saveAsParquetFile(df, "/tmp/sparkr-tmp/")
+#' write.parquet(df, "/tmp/sparkr-tmp1/")
+#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/")
#'}
+setMethod("write.parquet",
+ signature(x = "DataFrame", path = "character"),
+ function(x, path) {
+ write <- callJMethod(x@sdf, "write")
+ invisible(callJMethod(write, "parquet", path))
+ })
+
+#' @rdname write.parquet
+#' @name saveAsParquetFile
+#' @export
setMethod("saveAsParquetFile",
signature(x = "DataFrame", path = "character"),
function(x, path) {
- invisible(callJMethod(x@sdf, "saveAsParquetFile", path))
+ .Deprecated("write.parquet")
+ write.parquet(x, path)
})
#' Distinct
http://git-wip-us.apache.org/repos/asf/spark/blob/22f6cd86/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index c383e6e..62be2dd 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -519,10 +519,6 @@ setGeneric("sample_frac",
#' @export
setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("sampleBy") })
-#' @rdname saveAsParquetFile
-#' @export
-setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
-
#' @rdname saveAsTable
#' @export
setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
@@ -541,6 +537,18 @@ setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
#' @export
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })
+#' @rdname write.json
+#' @export
+setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
+
+#' @rdname write.parquet
+#' @export
+setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
+
+#' @rdname write.parquet
+#' @export
+setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
+
#' @rdname schema
#' @export
setGeneric("schema", function(x) { standardGeneric("schema") })
http://git-wip-us.apache.org/repos/asf/spark/blob/22f6cd86/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 071fd31..135c757 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -371,22 +371,49 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
})
-test_that("read.json()/jsonFile() on a local file returns a DataFrame", {
+test_that("read/write json files", {
+ # Test read.df
+ df <- read.df(sqlContext, jsonPath, "json")
+ expect_is(df, "DataFrame")
+ expect_equal(count(df), 3)
+
+ # Test read.df with a user defined schema
+ schema <- structType(structField("name", type = "string"),
+ structField("age", type = "double"))
+
+ df1 <- read.df(sqlContext, jsonPath, "json", schema)
+ expect_is(df1, "DataFrame")
+ expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
+
+ # Test loadDF
+ df2 <- loadDF(sqlContext, jsonPath, "json", schema)
+ expect_is(df2, "DataFrame")
+ expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
+
+ # Test read.json
df <- read.json(sqlContext, jsonPath)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
- # read.json()/jsonFile() works with multiple input paths
+
+ # Test write.df
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json")
write.df(df, jsonPath2, "json", mode="overwrite")
- jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2))
+
+ # Test write.json
+ jsonPath3 <- tempfile(pattern="jsonPath3", fileext=".json")
+ write.json(df, jsonPath3)
+
+ # Test read.json()/jsonFile() works with multiple input paths
+ jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3))
expect_is(jsonDF1, "DataFrame")
expect_equal(count(jsonDF1), 6)
# Suppress warnings because jsonFile is deprecated
- jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2)))
+ jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3)))
expect_is(jsonDF2, "DataFrame")
expect_equal(count(jsonDF2), 6)
unlink(jsonPath2)
+ unlink(jsonPath3)
})
test_that("jsonRDD() on a RDD with json string", {
@@ -454,6 +481,9 @@ test_that("insertInto() on a registered table", {
expect_equal(count(sql(sqlContext, "select * from table1")), 2)
expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob")
dropTempTable(sqlContext, "table1")
+
+ unlink(jsonPath2)
+ unlink(parquetPath2)
})
test_that("table() returns a new DataFrame", {
@@ -848,33 +878,6 @@ test_that("column calculation", {
expect_equal(count(df2), 3)
})
-test_that("read.df() from json file", {
- df <- read.df(sqlContext, jsonPath, "json")
- expect_is(df, "DataFrame")
- expect_equal(count(df), 3)
-
- # Check if we can apply a user defined schema
- schema <- structType(structField("name", type = "string"),
- structField("age", type = "double"))
-
- df1 <- read.df(sqlContext, jsonPath, "json", schema)
- expect_is(df1, "DataFrame")
- expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
-
- # Run the same with loadDF
- df2 <- loadDF(sqlContext, jsonPath, "json", schema)
- expect_is(df2, "DataFrame")
- expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
-})
-
-test_that("write.df() as parquet file", {
- df <- read.df(sqlContext, jsonPath, "json")
- write.df(df, parquetPath, "parquet", mode="overwrite")
- df2 <- read.df(sqlContext, parquetPath, "parquet")
- expect_is(df2, "DataFrame")
- expect_equal(count(df2), 3)
-})
-
test_that("test HiveContext", {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
@@ -895,6 +898,8 @@ test_that("test HiveContext", {
df3 <- sql(hiveCtx, "select * from json2")
expect_is(df3, "DataFrame")
expect_equal(count(df3), 3)
+
+ unlink(jsonPath2)
})
test_that("column operators", {
@@ -1333,6 +1338,9 @@ test_that("join() and merge() on a DataFrame", {
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))
+
+ unlink(jsonPath2)
+ unlink(jsonPath3)
})
test_that("toJSON() returns an RDD of the correct values", {
@@ -1396,6 +1404,8 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {
# Test base::intersect is working
expect_equal(length(intersect(1:20, 3:23)), 18)
+
+ unlink(jsonPath2)
})
test_that("withColumn() and withColumnRenamed()", {
@@ -1440,31 +1450,35 @@ test_that("mutate(), transform(), rename() and names()", {
detach(airquality)
})
-test_that("write.df() on DataFrame and works with read.parquet", {
- df <- read.json(sqlContext, jsonPath)
+test_that("read/write Parquet files", {
+ df <- read.df(sqlContext, jsonPath, "json")
+ # Test write.df and read.df
write.df(df, parquetPath, "parquet", mode="overwrite")
- parquetDF <- read.parquet(sqlContext, parquetPath)
- expect_is(parquetDF, "DataFrame")
- expect_equal(count(df), count(parquetDF))
-})
+ df2 <- read.df(sqlContext, parquetPath, "parquet")
+ expect_is(df2, "DataFrame")
+ expect_equal(count(df2), 3)
-test_that("read.parquet()/parquetFile() works with multiple input paths", {
- df <- read.json(sqlContext, jsonPath)
- write.df(df, parquetPath, "parquet", mode="overwrite")
+ # Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
- write.df(df, parquetPath2, "parquet", mode="overwrite")
- parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2))
+ write.parquet(df, parquetPath2)
+ parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
+ suppressWarnings(saveAsParquetFile(df, parquetPath3))
+ parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3))
expect_is(parquetDF, "DataFrame")
expect_equal(count(parquetDF), count(df) * 2)
- parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2))
+ parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3))
expect_is(parquetDF2, "DataFrame")
expect_equal(count(parquetDF2), count(df) * 2)
# Test if varargs works with variables
saveMode <- "overwrite"
mergeSchema <- "true"
- parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
- write.df(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
+ parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
+ write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)
+
+ unlink(parquetPath2)
+ unlink(parquetPath3)
+ unlink(parquetPath4)
})
test_that("describe() and summarize() on a DataFrame", {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org