You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/04/14 05:43:28 UTC
spark git commit: [Minor][SparkR] Minor refactor and removes
redundancy related to cleanClosure.
Repository: spark
Updated Branches:
refs/heads/master b45059d0d -> 0ba3fdd59
[Minor][SparkR] Minor refactor and removes redundancy related to cleanClosure.
1. Only use `cleanClosure` in creation of RRDDs. Normally, user and developer do not need to call `cleanClosure` in their function definition.
2. Removes redundant code (e.g. unnecessary wrapper functions) related to `cleanClosure`.
Author: hlin09 <hl...@gmail.com>
Closes #5495 from hlin09/cleanClosureFix and squashes the following commits:
74ec303 [hlin09] Minor refactor and removes redundancy.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ba3fdd5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ba3fdd5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ba3fdd5
Branch: refs/heads/master
Commit: 0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b
Parents: b45059d
Author: hlin09 <hl...@gmail.com>
Authored: Mon Apr 13 20:43:24 2015 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Mon Apr 13 20:43:24 2015 -0700
----------------------------------------------------------------------
R/pkg/R/RDD.R | 16 ++++------------
R/pkg/R/pairRDD.R | 4 ----
2 files changed, 4 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0ba3fdd5/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index d6a7500..820027e 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
# This transformation is the first in its stage:
- .Object@func <- func
+ .Object@func <- cleanClosure(func)
.Object@prev_jrdd <- getJRDD(prev)
.Object@env$prev_serializedMode <- prev@env$serializedMode
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
@@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
}
- .Object@func <- pipelinedFunc
+ .Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
# Get the serialization mode of the parent RDD
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
@@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
return(rdd@env$jrdd_val)
}
- computeFunc <- function(split, part) {
- rdd@func(split, part)
- }
-
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)
broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })
- serializedFuncArr <- serialize(computeFunc, connection = NULL)
+ serializedFuncArr <- serialize(rdd@func, connection = NULL)
prev_jrdd <- rdd@prev_jrdd
@@ -551,11 +547,7 @@ setMethod("mapPartitions",
setMethod("lapplyPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
- FUN <- cleanClosure(FUN)
- closureCapturingFunc <- function(split, part) {
- FUN(split, part)
- }
- PipelinedRDD(X, closureCapturingFunc)
+ PipelinedRDD(X, FUN)
})
#' @rdname lapplyPartitionsWithIndex
http://git-wip-us.apache.org/repos/asf/spark/blob/0ba3fdd5/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index c2396c3..739d399 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -694,10 +694,6 @@ setMethod("cogroup",
for (i in 1:rddsLen) {
rdds[[i]] <- lapply(rdds[[i]],
function(x) { list(x[[1]], list(i, x[[2]])) })
- # TODO(hao): As issue [SparkR-142] mentions, the right value of i
- # will not be captured into UDF if getJRDD is not invoked.
- # It should be resolved together with that issue.
- getJRDD(rdds[[i]]) # Capture the closure.
}
union.rdd <- Reduce(unionRDD, rdds)
group.func <- function(vlist) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org