You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/05/13 08:52:37 UTC

spark git commit: [SPARK-7482] [SPARKR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala.

Repository: spark
Updated Branches:
  refs/heads/master 208b90225 -> df9b94a57


[SPARK-7482] [SPARKR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala.

Author: Sun Rui <ru...@intel.com>

Closes #6007 from sun-rui/SPARK-7482 and squashes the following commits:

5c5cf5e [Sun Rui] Implement alias loadDF() as a new function.
3a30c10 [Sun Rui] Rename load()/save() to read.df()/write.df(). Also add loadDF()/saveDF() as aliases.
9f569d6 [Sun Rui] [SPARK-7482][SparkR] Rename some DataFrame API methods in SparkR to match their counterparts in Scala.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df9b94a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df9b94a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df9b94a5

Branch: refs/heads/master
Commit: df9b94a57cbd0e028228059d215b446d59d25ba8
Parents: 208b902
Author: Sun Rui <ru...@intel.com>
Authored: Tue May 12 23:52:30 2015 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Tue May 12 23:52:30 2015 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                  |  6 ++++--
 R/pkg/R/DataFrame.R              | 35 ++++++++++++++++++------------
 R/pkg/R/RDD.R                    |  4 ++--
 R/pkg/R/SQLContext.R             | 13 +++++++++---
 R/pkg/R/generics.R               | 22 +++++++++++--------
 R/pkg/inst/tests/test_sparkSQL.R | 40 +++++++++++++++++------------------
 6 files changed, 71 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df9b94a5/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 819e9a2..ba29614 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -37,7 +37,7 @@ exportMethods("arrange",
               "registerTempTable",
               "rename",
               "repartition",
-              "sampleDF",
+              "sample",
               "sample_frac",
               "saveAsParquetFile",
               "saveAsTable",
@@ -53,7 +53,8 @@ exportMethods("arrange",
               "unpersist",
               "where",
               "withColumn",
-              "withColumnRenamed")
+              "withColumnRenamed",
+              "write.df")
 
 exportClasses("Column")
 
@@ -101,6 +102,7 @@ export("cacheTable",
        "jsonFile",
        "loadDF",
        "parquetFile",
+       "read.df",
        "sql",
        "table",
        "tableNames",

http://git-wip-us.apache.org/repos/asf/spark/blob/df9b94a5/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 2705817..a7fa32e 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -294,8 +294,8 @@ setMethod("registerTempTable",
 #'\dontrun{
 #' sc <- sparkR.init()
 #' sqlCtx <- sparkRSQL.init(sc)
-#' df <- loadDF(sqlCtx, path, "parquet")
-#' df2 <- loadDF(sqlCtx, path2, "parquet")
+#' df <- read.df(sqlCtx, path, "parquet")
+#' df2 <- read.df(sqlCtx, path2, "parquet")
 #' registerTempTable(df, "table1")
 #' insertInto(df2, "table1", overwrite = TRUE)
 #'}
@@ -473,14 +473,14 @@ setMethod("distinct",
             dataFrame(sdf)
           })
 
-#' SampleDF
+#' Sample
 #'
 #' Return a sampled subset of this DataFrame using a random seed.
 #'
 #' @param x A SparkSQL DataFrame
 #' @param withReplacement Sampling with replacement or not
 #' @param fraction The (rough) sample target fraction
-#' @rdname sampleDF
+#' @rdname sample
 #' @aliases sample_frac
 #' @export
 #' @examples
@@ -489,10 +489,10 @@ setMethod("distinct",
 #' sqlCtx <- sparkRSQL.init(sc)
 #' path <- "path/to/file.json"
 #' df <- jsonFile(sqlCtx, path)
-#' collect(sampleDF(df, FALSE, 0.5)) 
-#' collect(sampleDF(df, TRUE, 0.5))
+#' collect(sample(df, FALSE, 0.5)) 
+#' collect(sample(df, TRUE, 0.5))
 #'}
-setMethod("sampleDF",
+setMethod("sample",
           # TODO : Figure out how to send integer as java.lang.Long to JVM so
           # we can send seed as an argument through callJMethod
           signature(x = "DataFrame", withReplacement = "logical",
@@ -503,13 +503,13 @@ setMethod("sampleDF",
             dataFrame(sdf)
           })
 
-#' @rdname sampleDF
-#' @aliases sampleDF
+#' @rdname sample
+#' @aliases sample
 setMethod("sample_frac",
           signature(x = "DataFrame", withReplacement = "logical",
                     fraction = "numeric"),
           function(x, withReplacement, fraction) {
-            sampleDF(x, withReplacement, fraction)
+            sample(x, withReplacement, fraction)
           })
 
 #' Count
@@ -1303,7 +1303,7 @@ setMethod("except",
 #' @param source A name for external data source
 #' @param mode One of 'append', 'overwrite', 'error', 'ignore'
 #'
-#' @rdname saveAsTable
+#' @rdname write.df 
 #' @export
 #' @examples
 #'\dontrun{
@@ -1311,9 +1311,9 @@ setMethod("except",
 #' sqlCtx <- sparkRSQL.init(sc)
 #' path <- "path/to/file.json"
 #' df <- jsonFile(sqlCtx, path)
-#' saveAsTable(df, "myfile")
+#' write.df(df, "myfile", "parquet", "overwrite")
 #' }
-setMethod("saveDF",
+setMethod("write.df",
           signature(df = "DataFrame", path = 'character', source = 'character',
                     mode = 'character'),
           function(df, path = NULL, source = NULL, mode = "append", ...){
@@ -1334,6 +1334,15 @@ setMethod("saveDF",
             callJMethod(df@sdf, "save", source, jmode, options)
           })
 
+#' @rdname write.df
+#' @aliases saveDF
+#' @export
+setMethod("saveDF",
+          signature(df = "DataFrame", path = 'character', source = 'character',
+                    mode = 'character'),
+          function(df, path = NULL, source = NULL, mode = "append", ...){
+            write.df(df, path, source, mode, ...)
+          })
 
 #' saveAsTable
 #'

http://git-wip-us.apache.org/repos/asf/spark/blob/df9b94a5/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 9138629..d3a68ff 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -927,7 +927,7 @@ setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
                                                                     MAXINT)))))
 
             # TODO(zongheng): investigate if this call is an in-place shuffle?
-            sample(samples)[1:total]
+            base::sample(samples)[1:total]
           })
 
 # Creates tuples of the elements in this RDD by applying a function.
@@ -996,7 +996,7 @@ setMethod("coalesce",
              if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
                func <- function(partIndex, part) {
                  set.seed(partIndex)  # partIndex as seed
-                 start <- as.integer(sample(numPartitions, 1) - 1)
+                 start <- as.integer(base::sample(numPartitions, 1) - 1)
                  lapply(seq_along(part),
                         function(i) {
                           pos <- (start + i) %% numPartitions

http://git-wip-us.apache.org/repos/asf/spark/blob/df9b94a5/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index cae06e6..531442e 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -421,7 +421,7 @@ clearCache <- function(sqlCtx) {
 #' \dontrun{
 #' sc <- sparkR.init()
 #' sqlCtx <- sparkRSQL.init(sc)
-#' df <- loadDF(sqlCtx, path, "parquet")
+#' df <- read.df(sqlCtx, path, "parquet")
 #' registerTempTable(df, "table")
 #' dropTempTable(sqlCtx, "table")
 #' }
@@ -450,10 +450,10 @@ dropTempTable <- function(sqlCtx, tableName) {
 #'\dontrun{
 #' sc <- sparkR.init()
 #' sqlCtx <- sparkRSQL.init(sc)
-#' df <- load(sqlCtx, "path/to/file.json", source = "json")
+#' df <- read.df(sqlCtx, "path/to/file.json", source = "json")
 #' }
 
-loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
+read.df <- function(sqlCtx, path = NULL, source = NULL, ...) {
   options <- varargsToEnv(...)
   if (!is.null(path)) {
     options[['path']] <- path
@@ -462,6 +462,13 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
   dataFrame(sdf)
 }
 
+#' @aliases loadDF
+#' @export
+
+loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
+  read.df(sqlCtx, path, source, ...)
+}
+
 #' Create an external table
 #'
 #' Creates an external table based on the dataset in a data source,

http://git-wip-us.apache.org/repos/asf/spark/blob/df9b94a5/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 557128a..6d2bfb1 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -456,19 +456,19 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
 #' @export
 setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })
 
-#' @rdname sampleDF
+#' @rdname sample
 #' @export
-setGeneric("sample_frac",
+setGeneric("sample",
            function(x, withReplacement, fraction, seed) {
-             standardGeneric("sample_frac")
-          })
+             standardGeneric("sample")
+           })
 
-#' @rdname sampleDF
+#' @rdname sample
 #' @export
-setGeneric("sampleDF",
+setGeneric("sample_frac",
            function(x, withReplacement, fraction, seed) {
-             standardGeneric("sampleDF")
-          })
+             standardGeneric("sample_frac")
+           })
 
 #' @rdname saveAsParquetFile
 #' @export
@@ -480,7 +480,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
   standardGeneric("saveAsTable")
 })
 
-#' @rdname saveAsTable
+#' @rdname write.df
+#' @export
+setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })
+
+#' @rdname write.df
 #' @export
 setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/df9b94a5/R/pkg/inst/tests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 99c2883..1109e8f 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -209,18 +209,18 @@ test_that("registerTempTable() results in a queryable table and sql() results in
 })
 
 test_that("insertInto() on a registered table", {
-  df <- loadDF(sqlCtx, jsonPath, "json")
-  saveDF(df, parquetPath, "parquet", "overwrite")
-  dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")
+  df <- read.df(sqlCtx, jsonPath, "json")
+  write.df(df, parquetPath, "parquet", "overwrite")
+  dfParquet <- read.df(sqlCtx, parquetPath, "parquet")
 
   lines <- c("{\"name\":\"Bob\", \"age\":24}",
              "{\"name\":\"James\", \"age\":35}")
   jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
   parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
   writeLines(lines, jsonPath2)
-  df2 <- loadDF(sqlCtx, jsonPath2, "json")
-  saveDF(df2, parquetPath2, "parquet", "overwrite")
-  dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")
+  df2 <- read.df(sqlCtx, jsonPath2, "json")
+  write.df(df2, parquetPath2, "parquet", "overwrite")
+  dfParquet2 <- read.df(sqlCtx, parquetPath2, "parquet")
 
   registerTempTable(dfParquet, "table1")
   insertInto(dfParquet2, "table1")
@@ -421,12 +421,12 @@ test_that("distinct() on DataFrames", {
   expect_true(count(uniques) == 3)
 })
 
-test_that("sampleDF on a DataFrame", {
+test_that("sample on a DataFrame", {
   df <- jsonFile(sqlCtx, jsonPath)
-  sampled <- sampleDF(df, FALSE, 1.0)
+  sampled <- sample(df, FALSE, 1.0)
   expect_equal(nrow(collect(sampled)), count(df))
   expect_true(inherits(sampled, "DataFrame"))
-  sampled2 <- sampleDF(df, FALSE, 0.1)
+  sampled2 <- sample(df, FALSE, 0.1)
   expect_true(count(sampled2) < 3)
 
   # Also test sample_frac
@@ -491,16 +491,16 @@ test_that("column calculation", {
   expect_true(count(df2) == 3)
 })
 
-test_that("load() from json file", {
-  df <- loadDF(sqlCtx, jsonPath, "json")
+test_that("read.df() from json file", {
+  df <- read.df(sqlCtx, jsonPath, "json")
   expect_true(inherits(df, "DataFrame"))
   expect_true(count(df) == 3)
 })
 
-test_that("save() as parquet file", {
-  df <- loadDF(sqlCtx, jsonPath, "json")
-  saveDF(df, parquetPath, "parquet", mode="overwrite")
-  df2 <- loadDF(sqlCtx, parquetPath, "parquet")
+test_that("write.df() as parquet file", {
+  df <- read.df(sqlCtx, jsonPath, "json")
+  write.df(df, parquetPath, "parquet", mode="overwrite")
+  df2 <- read.df(sqlCtx, parquetPath, "parquet")
   expect_true(inherits(df2, "DataFrame"))
   expect_true(count(df2) == 3)
 })
@@ -670,7 +670,7 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", {
              "{\"name\":\"James\", \"age\":35}")
   jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
   writeLines(lines, jsonPath2)
-  df2 <- loadDF(sqlCtx, jsonPath2, "json")
+  df2 <- read.df(sqlCtx, jsonPath2, "json")
 
   unioned <- arrange(unionAll(df, df2), df$age)
   expect_true(inherits(unioned, "DataFrame"))
@@ -712,9 +712,9 @@ test_that("mutate() and rename()", {
   expect_true(columns(newDF2)[1] == "newerAge")
 })
 
-test_that("saveDF() on DataFrame and works with parquetFile", {
+test_that("write.df() on DataFrame and works with parquetFile", {
   df <- jsonFile(sqlCtx, jsonPath)
-  saveDF(df, parquetPath, "parquet", mode="overwrite")
+  write.df(df, parquetPath, "parquet", mode="overwrite")
   parquetDF <- parquetFile(sqlCtx, parquetPath)
   expect_true(inherits(parquetDF, "DataFrame"))
   expect_equal(count(df), count(parquetDF))
@@ -722,9 +722,9 @@ test_that("saveDF() on DataFrame and works with parquetFile", {
 
 test_that("parquetFile works with multiple input paths", {
   df <- jsonFile(sqlCtx, jsonPath)
-  saveDF(df, parquetPath, "parquet", mode="overwrite")
+  write.df(df, parquetPath, "parquet", mode="overwrite")
   parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
-  saveDF(df, parquetPath2, "parquet", mode="overwrite")
+  write.df(df, parquetPath2, "parquet", mode="overwrite")
   parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
   expect_true(inherits(parquetDF, "DataFrame"))
   expect_true(count(parquetDF) == count(df)*2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org