You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2016/10/26 20:26:51 UTC

spark git commit: [SPARK-17961][SPARKR][SQL] Add storageLevel to DataFrame for SparkR

Repository: spark
Updated Branches:
  refs/heads/master ea3605e82 -> fb0a8a8dd


[SPARK-17961][SPARKR][SQL] Add storageLevel to DataFrame for SparkR

## What changes were proposed in this pull request?

Add storageLevel to DataFrame for SparkR.
This is similar to this RP:  https://github.com/apache/spark/pull/13780

but in R I do not make a class for `StorageLevel`
but add a method `storageToString`

## How was this patch tested?

test added.

Author: WeichenXu <We...@outlook.com>

Closes #15516 from WeichenXu123/storageLevel_df_r.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb0a8a8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb0a8a8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb0a8a8d

Branch: refs/heads/master
Commit: fb0a8a8dd7e8985676a846684b956e2d988875c6
Parents: ea3605e
Author: WeichenXu <We...@outlook.com>
Authored: Wed Oct 26 13:26:43 2016 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Wed Oct 26 13:26:43 2016 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |  1 +
 R/pkg/R/DataFrame.R                       | 28 +++++++++++++++++-
 R/pkg/R/RDD.R                             |  2 +-
 R/pkg/R/generics.R                        |  6 +++-
 R/pkg/R/utils.R                           | 41 ++++++++++++++++++++++++++
 R/pkg/inst/tests/testthat/test_sparkSQL.R |  5 +++-
 6 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8718185..eb314f4 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -124,6 +124,7 @@ exportMethods("arrange",
               "selectExpr",
               "show",
               "showDF",
+              "storageLevel",
               "subset",
               "summarize",
               "summary",

http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index b6ce838..be34e4b 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -633,7 +633,7 @@ setMethod("persist",
 #' @param ... further arguments to be passed to or from other methods.
 #'
 #' @family SparkDataFrame functions
-#' @rdname unpersist-methods
+#' @rdname unpersist
 #' @aliases unpersist,SparkDataFrame-method
 #' @name unpersist
 #' @export
@@ -654,6 +654,32 @@ setMethod("unpersist",
             x
           })
 
+#' StorageLevel
+#'
+#' Get storagelevel of this SparkDataFrame.
+#'
+#' @param x the SparkDataFrame to get the storageLevel.
+#'
+#' @family SparkDataFrame functions
+#' @rdname storageLevel
+#' @aliases storageLevel,SparkDataFrame-method
+#' @name storageLevel
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' persist(df, "MEMORY_AND_DISK")
+#' storageLevel(df)
+#'}
+#' @note storageLevel since 2.1.0
+setMethod("storageLevel",
+          signature(x = "SparkDataFrame"),
+          function(x) {
+            storageLevelToString(callJMethod(x@sdf, "storageLevel"))
+          })
+
 #' Repartition
 #'
 #' The following options for repartition are possible:

http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 6cd0704..0f1162f 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -261,7 +261,7 @@ setMethod("persistRDD",
 #' cache(rdd) # rdd@@env$isCached == TRUE
 #' unpersistRDD(rdd) # rdd@@env$isCached == FALSE
 #'}
-#' @rdname unpersist-methods
+#' @rdname unpersist
 #' @aliases unpersist,RDD-method
 #' @noRd
 setMethod("unpersistRDD",

http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 5549cd7..4569fe4 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -691,6 +691,10 @@ setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr")
 #' @export
 setGeneric("showDF", function(x, ...) { standardGeneric("showDF") })
 
+# @rdname storageLevel
+# @export
+setGeneric("storageLevel", function(x) { standardGeneric("storageLevel") })
+
 #' @rdname subset
 #' @export
 setGeneric("subset", function(x, ...) { standardGeneric("subset") })
@@ -715,7 +719,7 @@ setGeneric("union", function(x, y) { standardGeneric("union") })
 #' @export
 setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
 
-#' @rdname unpersist-methods
+#' @rdname unpersist
 #' @export
 setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index fa8bb0f..c4e78cb 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -385,6 +385,47 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
                          "OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
 }
 
+storageLevelToString <- function(levelObj) {
+  useDisk <- callJMethod(levelObj, "useDisk")
+  useMemory <- callJMethod(levelObj, "useMemory")
+  useOffHeap <- callJMethod(levelObj, "useOffHeap")
+  deserialized <- callJMethod(levelObj, "deserialized")
+  replication <- callJMethod(levelObj, "replication")
+  shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
+    "NONE"
+  } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
+    "DISK_ONLY"
+  } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
+    "DISK_ONLY_2"
+  } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
+    "MEMORY_ONLY"
+  } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
+    "MEMORY_ONLY_2"
+  } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
+    "MEMORY_ONLY_SER"
+  } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
+    "MEMORY_ONLY_SER_2"
+  } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
+    "MEMORY_AND_DISK"
+  } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
+    "MEMORY_AND_DISK_2"
+  } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
+    "MEMORY_AND_DISK_SER"
+  } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
+    "MEMORY_AND_DISK_SER_2"
+  } else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) {
+    "OFF_HEAP"
+  } else {
+    NULL
+  }
+  fullInfo <- callJMethod(levelObj, "toString")
+  if (is.null(shortName)) {
+    fullInfo
+  } else {
+    paste(shortName, "-", fullInfo)
+  }
+}
+
 # Utility function for functions where an argument needs to be integer but we want to allow
 # the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
 numToInt <- function(num) {

http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index e77dbde..9289db5 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -796,7 +796,7 @@ test_that("multiple pipeline transformations result in an RDD with the correct v
   expect_false(collectRDD(second)[[3]]$testCol)
 })
 
-test_that("cache(), persist(), and unpersist() on a DataFrame", {
+test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame", {
   df <- read.json(jsonPath)
   expect_false(df@env$isCached)
   cache(df)
@@ -808,6 +808,9 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", {
   persist(df, "MEMORY_AND_DISK")
   expect_true(df@env$isCached)
 
+  expect_equal(storageLevel(df),
+    "MEMORY_AND_DISK - StorageLevel(disk, memory, deserialized, 1 replicas)")
+
   unpersist(df)
   expect_false(df@env$isCached)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org