You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/04/14 05:43:28 UTC

spark git commit: [Minor][SparkR] Minor refactor and removes redundancy related to cleanClosure.

Repository: spark
Updated Branches:
  refs/heads/master b45059d0d -> 0ba3fdd59


[Minor][SparkR] Minor refactor and removes redundancy related to cleanClosure.

1. Only use `cleanClosure` in creation of RRDDs. Normally, user and developer do not need to call `cleanClosure` in their function definition.
2. Removes redundant code (e.g. unnecessary wrapper functions) related to `cleanClosure`.

Author: hlin09 <hl...@gmail.com>

Closes #5495 from hlin09/cleanClosureFix and squashes the following commits:

74ec303 [hlin09] Minor refactor and removes redundancy.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ba3fdd5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ba3fdd5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ba3fdd5

Branch: refs/heads/master
Commit: 0ba3fdd5992cf09bd38303ebff34d2ed19e5e09b
Parents: b45059d
Author: hlin09 <hl...@gmail.com>
Authored: Mon Apr 13 20:43:24 2015 -0700
Committer: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Committed: Mon Apr 13 20:43:24 2015 -0700

----------------------------------------------------------------------
 R/pkg/R/RDD.R     | 16 ++++------------
 R/pkg/R/pairRDD.R |  4 ----
 2 files changed, 4 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ba3fdd5/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index d6a7500..820027e 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
 
   if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
     # This transformation is the first in its stage:
-    .Object@func <- func
+    .Object@func <- cleanClosure(func)
     .Object@prev_jrdd <- getJRDD(prev)
     .Object@env$prev_serializedMode <- prev@env$serializedMode
     # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
@@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
     pipelinedFunc <- function(split, iterator) {
       func(split, prev@func(split, iterator))
     }
-    .Object@func <- pipelinedFunc
+    .Object@func <- cleanClosure(pipelinedFunc)
     .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
     # Get the serialization mode of the parent RDD
     .Object@env$prev_serializedMode <- prev@env$prev_serializedMode
@@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
               return(rdd@env$jrdd_val)
             }
 
-            computeFunc <- function(split, part) {
-              rdd@func(split, part)
-            }
-
             packageNamesArr <- serialize(.sparkREnv[[".packages"]],
                                          connection = NULL)
 
             broadcastArr <- lapply(ls(.broadcastNames),
                                    function(name) { get(name, .broadcastNames) })
 
-            serializedFuncArr <- serialize(computeFunc, connection = NULL)
+            serializedFuncArr <- serialize(rdd@func, connection = NULL)
 
             prev_jrdd <- rdd@prev_jrdd
 
@@ -551,11 +547,7 @@ setMethod("mapPartitions",
 setMethod("lapplyPartitionsWithIndex",
           signature(X = "RDD", FUN = "function"),
           function(X, FUN) {
-            FUN <- cleanClosure(FUN)
-            closureCapturingFunc <- function(split, part) {
-              FUN(split, part)
-            }
-            PipelinedRDD(X, closureCapturingFunc)
+            PipelinedRDD(X, FUN)
           })
 
 #' @rdname lapplyPartitionsWithIndex

http://git-wip-us.apache.org/repos/asf/spark/blob/0ba3fdd5/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
index c2396c3..739d399 100644
--- a/R/pkg/R/pairRDD.R
+++ b/R/pkg/R/pairRDD.R
@@ -694,10 +694,6 @@ setMethod("cogroup",
             for (i in 1:rddsLen) {
               rdds[[i]] <- lapply(rdds[[i]], 
                                   function(x) { list(x[[1]], list(i, x[[2]])) })
-              # TODO(hao): As issue [SparkR-142] mentions, the right value of i
-              # will not be captured into UDF if getJRDD is not invoked.
-              # It should be resolved together with that issue.
-              getJRDD(rdds[[i]])  # Capture the closure.
             }
             union.rdd <- Reduce(unionRDD, rdds)
             group.func <- function(vlist) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org