You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2016/10/26 20:26:51 UTC
spark git commit: [SPARK-17961][SPARKR][SQL] Add storageLevel to
DataFrame for SparkR
Repository: spark
Updated Branches:
refs/heads/master ea3605e82 -> fb0a8a8dd
[SPARK-17961][SPARKR][SQL] Add storageLevel to DataFrame for SparkR
## What changes were proposed in this pull request?
Add storageLevel to DataFrame for SparkR.
This is similar to this RP: https://github.com/apache/spark/pull/13780
but in R I do not make a class for `StorageLevel`
but add a method `storageToString`
## How was this patch tested?
test added.
Author: WeichenXu <We...@outlook.com>
Closes #15516 from WeichenXu123/storageLevel_df_r.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb0a8a8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb0a8a8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb0a8a8d
Branch: refs/heads/master
Commit: fb0a8a8dd7e8985676a846684b956e2d988875c6
Parents: ea3605e
Author: WeichenXu <We...@outlook.com>
Authored: Wed Oct 26 13:26:43 2016 -0700
Committer: Felix Cheung <fe...@apache.org>
Committed: Wed Oct 26 13:26:43 2016 -0700
----------------------------------------------------------------------
R/pkg/NAMESPACE | 1 +
R/pkg/R/DataFrame.R | 28 +++++++++++++++++-
R/pkg/R/RDD.R | 2 +-
R/pkg/R/generics.R | 6 +++-
R/pkg/R/utils.R | 41 ++++++++++++++++++++++++++
R/pkg/inst/tests/testthat/test_sparkSQL.R | 5 +++-
6 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 8718185..eb314f4 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -124,6 +124,7 @@ exportMethods("arrange",
"selectExpr",
"show",
"showDF",
+ "storageLevel",
"subset",
"summarize",
"summary",
http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index b6ce838..be34e4b 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -633,7 +633,7 @@ setMethod("persist",
#' @param ... further arguments to be passed to or from other methods.
#'
#' @family SparkDataFrame functions
-#' @rdname unpersist-methods
+#' @rdname unpersist
#' @aliases unpersist,SparkDataFrame-method
#' @name unpersist
#' @export
@@ -654,6 +654,32 @@ setMethod("unpersist",
x
})
+#' StorageLevel
+#'
+#' Get storagelevel of this SparkDataFrame.
+#'
+#' @param x the SparkDataFrame to get the storageLevel.
+#'
+#' @family SparkDataFrame functions
+#' @rdname storageLevel
+#' @aliases storageLevel,SparkDataFrame-method
+#' @name storageLevel
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' path <- "path/to/file.json"
+#' df <- read.json(path)
+#' persist(df, "MEMORY_AND_DISK")
+#' storageLevel(df)
+#'}
+#' @note storageLevel since 2.1.0
+setMethod("storageLevel",
+ signature(x = "SparkDataFrame"),
+ function(x) {
+ storageLevelToString(callJMethod(x@sdf, "storageLevel"))
+ })
+
#' Repartition
#'
#' The following options for repartition are possible:
http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 6cd0704..0f1162f 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -261,7 +261,7 @@ setMethod("persistRDD",
#' cache(rdd) # rdd@@env$isCached == TRUE
#' unpersistRDD(rdd) # rdd@@env$isCached == FALSE
#'}
-#' @rdname unpersist-methods
+#' @rdname unpersist
#' @aliases unpersist,RDD-method
#' @noRd
setMethod("unpersistRDD",
http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 5549cd7..4569fe4 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -691,6 +691,10 @@ setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr")
#' @export
setGeneric("showDF", function(x, ...) { standardGeneric("showDF") })
+# @rdname storageLevel
+# @export
+setGeneric("storageLevel", function(x) { standardGeneric("storageLevel") })
+
#' @rdname subset
#' @export
setGeneric("subset", function(x, ...) { standardGeneric("subset") })
@@ -715,7 +719,7 @@ setGeneric("union", function(x, y) { standardGeneric("union") })
#' @export
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
-#' @rdname unpersist-methods
+#' @rdname unpersist
#' @export
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index fa8bb0f..c4e78cb 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -385,6 +385,47 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
"OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
}
+storageLevelToString <- function(levelObj) {
+ useDisk <- callJMethod(levelObj, "useDisk")
+ useMemory <- callJMethod(levelObj, "useMemory")
+ useOffHeap <- callJMethod(levelObj, "useOffHeap")
+ deserialized <- callJMethod(levelObj, "deserialized")
+ replication <- callJMethod(levelObj, "replication")
+ shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
+ "NONE"
+ } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
+ "DISK_ONLY"
+ } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
+ "DISK_ONLY_2"
+ } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
+ "MEMORY_ONLY"
+ } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
+ "MEMORY_ONLY_2"
+ } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
+ "MEMORY_ONLY_SER"
+ } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
+ "MEMORY_ONLY_SER_2"
+ } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
+ "MEMORY_AND_DISK"
+ } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
+ "MEMORY_AND_DISK_2"
+ } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
+ "MEMORY_AND_DISK_SER"
+ } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
+ "MEMORY_AND_DISK_SER_2"
+ } else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) {
+ "OFF_HEAP"
+ } else {
+ NULL
+ }
+ fullInfo <- callJMethod(levelObj, "toString")
+ if (is.null(shortName)) {
+ fullInfo
+ } else {
+ paste(shortName, "-", fullInfo)
+ }
+}
+
# Utility function for functions where an argument needs to be integer but we want to allow
# the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
numToInt <- function(num) {
http://git-wip-us.apache.org/repos/asf/spark/blob/fb0a8a8d/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index e77dbde..9289db5 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -796,7 +796,7 @@ test_that("multiple pipeline transformations result in an RDD with the correct v
expect_false(collectRDD(second)[[3]]$testCol)
})
-test_that("cache(), persist(), and unpersist() on a DataFrame", {
+test_that("cache(), storageLevel(), persist(), and unpersist() on a DataFrame", {
df <- read.json(jsonPath)
expect_false(df@env$isCached)
cache(df)
@@ -808,6 +808,9 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", {
persist(df, "MEMORY_AND_DISK")
expect_true(df@env$isCached)
+ expect_equal(storageLevel(df),
+ "MEMORY_AND_DISK - StorageLevel(disk, memory, deserialized, 1 replicas)")
+
unpersist(df)
expect_false(df@env$isCached)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org