You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/04/09 07:46:01 UTC

[5/7] spark git commit: [SPARK-5654] Integrate SparkR

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
new file mode 100644
index 0000000..604ad03
--- /dev/null
+++ b/R/pkg/R/RDD.R
@@ -0,0 +1,1539 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# RDD in R implemented in S4 OO system.
+
+setOldClass("jobj")
+
+#' @title S4 class that represents an RDD
+#' @description RDD can be created using functions like
+#'              \code{parallelize}, \code{textFile} etc.
+#' @rdname RDD
+#' @seealso parallelize, textFile
+#'
+#' @slot env An R environment that stores bookkeeping states of the RDD
+#' @slot jrdd Java object reference to the backing JavaRDD
+#' to an RDD
+#' @export
+setClass("RDD",
+         slots = list(env = "environment",
+                      jrdd = "jobj"))
+
+setClass("PipelinedRDD",
+         slots = list(prev = "RDD",
+                      func = "function",
+                      prev_jrdd = "jobj"),
+         contains = "RDD")
+
+setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
+                                        isCached, isCheckpointed) {
+  # Check that RDD constructor is using the correct version of serializedMode
+  stopifnot(class(serializedMode) == "character")
+  stopifnot(serializedMode %in% c("byte", "string", "row"))
+  # RDD has three serialization types:
+  # byte: The RDD stores data serialized in R.
+  # string: The RDD stores data as strings.
+  # row: The RDD stores the serialized rows of a DataFrame.
+  
+  # We use an environment to store mutable states inside an RDD object.
+  # Note that R's call-by-value semantics makes modifying slots inside an
+  # object (passed as an argument into a function, such as cache()) difficult:
+  # i.e. one needs to make a copy of the RDD object and sets the new slot value
+  # there.
+
+  # The slots are inheritable from superclass. Here, both `env' and `jrdd' are
+  # inherited from RDD, but only the former is used.
+  .Object@env <- new.env()
+  .Object@env$isCached <- isCached
+  .Object@env$isCheckpointed <- isCheckpointed
+  .Object@env$serializedMode <- serializedMode
+
+  .Object@jrdd <- jrdd
+  .Object
+})
+
+setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
+  .Object@env <- new.env()
+  .Object@env$isCached <- FALSE
+  .Object@env$isCheckpointed <- FALSE
+  .Object@env$jrdd_val <- jrdd_val
+  if (!is.null(jrdd_val)) {
+    # This tracks the serialization mode for jrdd_val
+    .Object@env$serializedMode <- prev@env$serializedMode
+  }
+
+  .Object@prev <- prev
+
+  isPipelinable <- function(rdd) {
+    e <- rdd@env
+    !(e$isCached || e$isCheckpointed)
+  }
+
+  if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
+    # This transformation is the first in its stage:
+    .Object@func <- func
+    .Object@prev_jrdd <- getJRDD(prev)
+    .Object@env$prev_serializedMode <- prev@env$serializedMode
+    # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
+    # prev_serializedMode is used during the delayed computation of JRDD in getJRDD
+  } else {
+    pipelinedFunc <- function(split, iterator) {
+      func(split, prev@func(split, iterator))
+    }
+    .Object@func <- pipelinedFunc
+    .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
+    # Get the serialization mode of the parent RDD
+    .Object@env$prev_serializedMode <- prev@env$prev_serializedMode
+  }
+
+  .Object
+})
+
+#' @rdname RDD
+#' @export
+#'
+#' @param jrdd Java object reference to the backing JavaRDD
+#' @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
+#' stores strings, and "row" if the RDD stores the rows of a DataFrame
+#' @param isCached TRUE if the RDD is cached
+#' @param isCheckpointed TRUE if the RDD has been checkpointed
+RDD <- function(jrdd, serializedMode = "byte", isCached = FALSE,
+                isCheckpointed = FALSE) {
+  new("RDD", jrdd, serializedMode, isCached, isCheckpointed)
+}
+
+PipelinedRDD <- function(prev, func) {
+  new("PipelinedRDD", prev, func, NULL)
+}
+
+# Return the serialization mode for an RDD.
+setGeneric("getSerializedMode", function(rdd, ...) { standardGeneric("getSerializedMode") })
+# For normal RDDs we can directly read the serializedMode
+setMethod("getSerializedMode", signature(rdd = "RDD"), function(rdd) rdd@env$serializedMode )
+# For pipelined RDDs if jrdd_val is set then serializedMode should exist
+# if not we return the defaultSerialization mode of "byte" as we don't know the serialization
+# mode at this point in time.
+setMethod("getSerializedMode", signature(rdd = "PipelinedRDD"),
+          function(rdd) {
+            if (!is.null(rdd@env$jrdd_val)) {
+              return(rdd@env$serializedMode)
+            } else {
+              return("byte")
+            }
+          })
+
+# The jrdd accessor function.
+setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd )
+setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
+          function(rdd, serializedMode = "byte") {
+            if (!is.null(rdd@env$jrdd_val)) {
+              return(rdd@env$jrdd_val)
+            }
+
+            computeFunc <- function(split, part) {
+              rdd@func(split, part)
+            }
+
+            packageNamesArr <- serialize(.sparkREnv[[".packages"]],
+                                         connection = NULL)
+
+            broadcastArr <- lapply(ls(.broadcastNames),
+                                   function(name) { get(name, .broadcastNames) })
+
+            serializedFuncArr <- serialize(computeFunc, connection = NULL)
+
+            prev_jrdd <- rdd@prev_jrdd
+
+            if (serializedMode == "string") {
+              rddRef <- newJObject("org.apache.spark.api.r.StringRRDD",
+                                   callJMethod(prev_jrdd, "rdd"),
+                                   serializedFuncArr,
+                                   rdd@env$prev_serializedMode,
+                                   packageNamesArr,
+                                   as.character(.sparkREnv[["libname"]]),
+                                   broadcastArr,
+                                   callJMethod(prev_jrdd, "classTag"))
+            } else {
+              rddRef <- newJObject("org.apache.spark.api.r.RRDD",
+                                   callJMethod(prev_jrdd, "rdd"),
+                                   serializedFuncArr,
+                                   rdd@env$prev_serializedMode,
+                                   serializedMode,
+                                   packageNamesArr,
+                                   as.character(.sparkREnv[["libname"]]),
+                                   broadcastArr,
+                                   callJMethod(prev_jrdd, "classTag"))
+            }
+            # Save the serialization flag after we create a RRDD
+            rdd@env$serializedMode <- serializedMode
+            rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD") # rddRef$asJavaRDD()
+            rdd@env$jrdd_val
+          })
+
+setValidity("RDD",
+            function(object) {
+              jrdd <- getJRDD(object)
+              cls <- callJMethod(jrdd, "getClass")
+              className <- callJMethod(cls, "getName")
+              if (grep("spark.api.java.*RDD*", className) == 1) {
+                TRUE
+              } else {
+                paste("Invalid RDD class ", className)
+              }
+            })
+
+
+############ Actions and Transformations ############
+
+#' Persist an RDD
+#'
+#' Persist this RDD with the default storage level (MEMORY_ONLY).
+#'
+#' @param x The RDD to cache
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' cache(rdd)
+#'}
+#' @rdname cache-methods
+#' @aliases cache,RDD-method
+setMethod("cache",
+          signature(x = "RDD"),
+          function(x) {
+            callJMethod(getJRDD(x), "cache")
+            x@env$isCached <- TRUE
+            x
+          })
+
+#' Persist an RDD
+#'
+#' Persist this RDD with the specified storage level. For details of the
+#' supported storage levels, refer to
+#' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
+#'
+#' @param x The RDD to persist
+#' @param newLevel The new storage level to be assigned
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' persist(rdd, "MEMORY_AND_DISK")
+#'}
+#' @rdname persist
+#' @aliases persist,RDD-method
+setMethod("persist",
+          signature(x = "RDD", newLevel = "character"),
+          function(x, newLevel) {
+            callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel))
+            x@env$isCached <- TRUE
+            x
+          })
+
+#' Unpersist an RDD
+#'
+#' Mark the RDD as non-persistent, and remove all blocks for it from memory and
+#' disk.
+#'
+#' @param x The RDD to unpersist
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' cache(rdd) # rdd@@env$isCached == TRUE
+#' unpersist(rdd) # rdd@@env$isCached == FALSE
+#'}
+#' @rdname unpersist-methods
+#' @aliases unpersist,RDD-method
+setMethod("unpersist",
+          signature(x = "RDD"),
+          function(x) {
+            callJMethod(getJRDD(x), "unpersist")
+            x@env$isCached <- FALSE
+            x
+          })
+
+#' Checkpoint an RDD
+#'
+#' Mark this RDD for checkpointing. It will be saved to a file inside the
+#' checkpoint directory set with setCheckpointDir() and all references to its
+#' parent RDDs will be removed. This function must be called before any job has
+#' been executed on this RDD. It is strongly recommended that this RDD is
+#' persisted in memory, otherwise saving it on a file will require recomputation.
+#'
+#' @param x The RDD to checkpoint
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' setCheckpointDir(sc, "checkpoints")
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' checkpoint(rdd)
+#'}
+#' @rdname checkpoint-methods
+#' @aliases checkpoint,RDD-method
+setMethod("checkpoint",
+          signature(x = "RDD"),
+          function(x) {
+            jrdd <- getJRDD(x)
+            callJMethod(jrdd, "checkpoint")
+            x@env$isCheckpointed <- TRUE
+            x
+          })
+
+#' Gets the number of partitions of an RDD
+#'
+#' @param x A RDD.
+#' @return the number of partitions of rdd as an integer.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' numPartitions(rdd)  # 2L
+#'}
+#' @rdname numPartitions
+#' @aliases numPartitions,RDD-method
+setMethod("numPartitions",
+          signature(x = "RDD"),
+          function(x) {
+            jrdd <- getJRDD(x)
+            partitions <- callJMethod(jrdd, "splits")
+            callJMethod(partitions, "size")
+          })
+
+#' Collect elements of an RDD
+#'
+#' @description
+#' \code{collect} returns a list that contains all of the elements in this RDD.
+#'
+#' @param x The RDD to collect
+#' @param ... Other optional arguments to collect
+#' @param flatten FALSE if the list should not flattened
+#' @return a list containing elements in the RDD
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' collect(rdd) # list from 1 to 10
+#' collectPartition(rdd, 0L) # list from 1 to 5
+#'}
+#' @rdname collect-methods
+#' @aliases collect,RDD-method
+setMethod("collect",
+          signature(x = "RDD"),
+          function(x, flatten = TRUE) {
+            # Assumes a pairwise RDD is backed by a JavaPairRDD.
+            collected <- callJMethod(getJRDD(x), "collect")
+            convertJListToRList(collected, flatten,
+              serializedMode = getSerializedMode(x))
+          })
+
+
+#' @description
+#' \code{collectPartition} returns a list that contains all of the elements
+#' in the specified partition of the RDD.
+#' @param partitionId the partition to collect (starts from 0)
+#' @rdname collect-methods
+#' @aliases collectPartition,integer,RDD-method
+setMethod("collectPartition",
+          signature(x = "RDD", partitionId = "integer"),
+          function(x, partitionId) {
+            jPartitionsList <- callJMethod(getJRDD(x),
+                                           "collectPartitions",
+                                           as.list(as.integer(partitionId)))
+
+            jList <- jPartitionsList[[1]]
+            convertJListToRList(jList, flatten = TRUE,
+              serializedMode = getSerializedMode(x))
+          })
+
+#' @description
+#' \code{collectAsMap} returns a named list as a map that contains all of the elements
+#' in a key-value pair RDD. 
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
+#' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
+#'}
+#' @rdname collect-methods
+#' @aliases collectAsMap,RDD-method
+setMethod("collectAsMap",
+          signature(x = "RDD"),
+          function(x) {
+            pairList <- collect(x)
+            map <- new.env()
+            lapply(pairList, function(i) { assign(as.character(i[[1]]), i[[2]], envir = map) })
+            as.list(map)
+          })
+
+#' Return the number of elements in the RDD.
+#'
+#' @param x The RDD to count
+#' @return number of elements in the RDD.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' count(rdd) # 10
+#' length(rdd) # Same as count
+#'}
+#' @rdname count
+#' @aliases count,RDD-method
+setMethod("count",
+          signature(x = "RDD"),
+          function(x) {
+            countPartition <- function(part) {
+              as.integer(length(part))
+            }
+            valsRDD <- lapplyPartition(x, countPartition)
+            vals <- collect(valsRDD)
+            sum(as.integer(vals))
+          })
+
+#' Return the number of elements in the RDD
+#' @export
+#' @rdname count
+setMethod("length",
+          signature(x = "RDD"),
+          function(x) {
+            count(x)
+          })
+
+#' Return the count of each unique value in this RDD as a list of
+#' (value, count) pairs.
+#'
+#' Same as countByValue in Spark.
+#'
+#' @param x The RDD to count
+#' @return list of (value, count) pairs, where count is number of each unique
+#' value in rdd.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, c(1,2,3,2,1))
+#' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
+#'}
+#' @rdname countByValue
+#' @aliases countByValue,RDD-method
+setMethod("countByValue",
+          signature(x = "RDD"),
+          function(x) {
+            ones <- lapply(x, function(item) { list(item, 1L) })
+            collect(reduceByKey(ones, `+`, numPartitions(x)))
+          })
+
+#' Apply a function to all elements
+#'
+#' This function creates a new RDD by applying the given transformation to all
+#' elements of the given RDD
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each element
+#' @return a new RDD created by the transformation.
+#' @rdname lapply
+#' @aliases lapply
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
+#' collect(multiplyByTwo) # 2,4,6...
+#'}
+setMethod("lapply",
+          signature(X = "RDD", FUN = "function"),
+          function(X, FUN) {
+            func <- function(split, iterator) {
+              lapply(iterator, FUN)
+            }
+            lapplyPartitionsWithIndex(X, func)
+          })
+
+#' @rdname lapply
+#' @aliases map,RDD,function-method
+setMethod("map",
+          signature(X = "RDD", FUN = "function"),
+          function(X, FUN) {
+            lapply(X, FUN)
+          })
+
+#' Flatten results after apply a function to all elements
+#'
+#' This function return a new RDD by first applying a function to all
+#' elements of this RDD, and then flattening the results.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each element
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
+#' collect(multiplyByTwo) # 2,20,4,40,6,60...
+#'}
+#' @rdname flatMap
+#' @aliases flatMap,RDD,function-method
+setMethod("flatMap",
+          signature(X = "RDD", FUN = "function"),
+          function(X, FUN) {
+            partitionFunc <- function(part) {
+              unlist(
+                lapply(part, FUN),
+                recursive = F
+              )
+            }
+            lapplyPartition(X, partitionFunc)
+          })
+
+#' Apply a function to each partition of an RDD
+#'
+#' Return a new RDD by applying a function to each partition of this RDD.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each partition.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
+#' collect(partitionSum) # 15, 40
+#'}
+#' @rdname lapplyPartition
+#' @aliases lapplyPartition,RDD,function-method
+setMethod("lapplyPartition",
+          signature(X = "RDD", FUN = "function"),
+          function(X, FUN) {
+            lapplyPartitionsWithIndex(X, function(s, part) { FUN(part) })
+          })
+
+#' mapPartitions is the same as lapplyPartition.
+#'
+#' @rdname lapplyPartition
+#' @aliases mapPartitions,RDD,function-method
+setMethod("mapPartitions",
+          signature(X = "RDD", FUN = "function"),
+          function(X, FUN) {
+            lapplyPartition(X, FUN)
+          })
+
+#' Return a new RDD by applying a function to each partition of this RDD, while
+#' tracking the index of the original partition.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each partition; takes the partition
+#'        index and a list of elements in the particular partition.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 5L)
+#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
+#'                                          split * Reduce("+", part) })
+#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
+#'}
+#' @rdname lapplyPartitionsWithIndex
+#' @aliases lapplyPartitionsWithIndex,RDD,function-method
+setMethod("lapplyPartitionsWithIndex",
+          signature(X = "RDD", FUN = "function"),
+          function(X, FUN) {
+            FUN <- cleanClosure(FUN)
+            closureCapturingFunc <- function(split, part) {
+              FUN(split, part)
+            }
+            PipelinedRDD(X, closureCapturingFunc)
+          })
+
+#' @rdname lapplyPartitionsWithIndex
+#' @aliases mapPartitionsWithIndex,RDD,function-method
+setMethod("mapPartitionsWithIndex",
+          signature(X = "RDD", FUN = "function"),
+          function(X, FUN) {
+            lapplyPartitionsWithIndex(X, FUN)
+          })
+
+#' This function returns a new RDD containing only the elements that satisfy
+#' a predicate (i.e. returning TRUE in a given logical function).
+#' The same as `filter()' in Spark.
+#'
+#' @param x The RDD to be filtered.
+#' @param f A unary predicate function.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' unlist(collect(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
+#'}
+#' @rdname filterRDD
+#' @aliases filterRDD,RDD,function-method
+setMethod("filterRDD",
+          signature(x = "RDD", f = "function"),
+          function(x, f) {
+            filter.func <- function(part) {
+              Filter(f, part)
+            }
+            lapplyPartition(x, filter.func)
+          })
+
+#' @rdname filterRDD
+#' @aliases Filter
+setMethod("Filter",
+          signature(f = "function", x = "RDD"),
+          function(f, x) {
+            filterRDD(x, f)
+          })
+
+#' Reduce across elements of an RDD.
+#'
+#' This function reduces the elements of this RDD using the
+#' specified commutative and associative binary operator.
+#'
+#' @param x The RDD to reduce
+#' @param func Commutative and associative function to apply on elements
+#'             of the RDD.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' reduce(rdd, "+") # 55
+#'}
+#' @rdname reduce
+#' @aliases reduce,RDD,ANY-method
+setMethod("reduce",
+          signature(x = "RDD", func = "ANY"),
+          function(x, func) {
+
+            reducePartition <- function(part) {
+              Reduce(func, part)
+            }
+
+            partitionList <- collect(lapplyPartition(x, reducePartition),
+                                     flatten = FALSE)
+            Reduce(func, partitionList)
+          })
+
+#' Get the maximum element of an RDD.
+#'
+#' @param x The RDD to get the maximum element from
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' maximum(rdd) # 10
+#'}
+#' @rdname maximum
+#' @aliases maximum,RDD
+setMethod("maximum",
+          signature(x = "RDD"),
+          function(x) {
+            reduce(x, max)
+          })
+
+#' Get the minimum element of an RDD.
+#'
+#' @param x The RDD to get the minimum element from
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' minimum(rdd) # 1
+#'}
+#' @rdname minimum
+#' @aliases minimum,RDD
+setMethod("minimum",
+          signature(x = "RDD"),
+          function(x) {
+            reduce(x, min)
+          })
+
+#' Add up the elements in an RDD.
+#'
+#' @param x The RDD to add up the elements in
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' sumRDD(rdd) # 55
+#'}
+#' @rdname sumRDD 
+#' @aliases sumRDD,RDD
+setMethod("sumRDD",
+          signature(x = "RDD"),
+          function(x) {
+            reduce(x, "+")
+          })
+
+#' Applies a function to all elements in an RDD, and force evaluation.
+#'
+#' @param x The RDD to apply the function
+#' @param func The function to be applied.
+#' @return invisible NULL.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' foreach(rdd, function(x) { save(x, file=...) })
+#'}
+#' @rdname foreach
+#' @aliases foreach,RDD,function-method
+setMethod("foreach",
+          signature(x = "RDD", func = "function"),
+          function(x, func) {
+            partition.func <- function(x) {
+              lapply(x, func)
+              NULL
+            }
+            invisible(collect(mapPartitions(x, partition.func)))
+          })
+
+#' Applies a function to each partition in an RDD, and force evaluation.
+#'
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' foreachPartition(rdd, function(part) { save(part, file=...); NULL })
+#'}
+#' @rdname foreach
+#' @aliases foreachPartition,RDD,function-method
+setMethod("foreachPartition",
+          signature(x = "RDD", func = "function"),
+          function(x, func) {
+            invisible(collect(mapPartitions(x, func)))
+          })
+
+#' Take elements from an RDD.
+#'
+#' This function takes the first NUM elements in the RDD and
+#' returns them in a list.
+#'
+#' @param x The RDD to take elements from
+#' @param num Number of elements to take
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' take(rdd, 2L) # list(1, 2)
+#'}
+#' @rdname take
+#' @aliases take,RDD,numeric-method
+setMethod("take",
+          signature(x = "RDD", num = "numeric"),
+          function(x, num) {
+            resList <- list()
+            index <- -1
+            jrdd <- getJRDD(x)
+            numPartitions <- numPartitions(x)
+
+            # TODO(shivaram): Collect more than one partition based on size
+            # estimates similar to the scala version of `take`.
+            while (TRUE) {
+              index <- index + 1
+
+              if (length(resList) >= num || index >= numPartitions)
+                break
+
+              # a JList of byte arrays
+              partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
+              partition <- partitionArr[[1]]
+
+              size <- num - length(resList)
+              # elems is capped to have at most `size` elements
+              elems <- convertJListToRList(partition,
+                                           flatten = TRUE,
+                                           logicalUpperBound = size,
+                                           serializedMode = getSerializedMode(x))
+              # TODO: Check if this append is O(n^2)?
+              resList <- append(resList, elems)
+            }
+            resList
+          })
+
+#' First
+#'
+#' Return the first element of an RDD
+#'
+#' @rdname first
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' first(rdd)
+#' }
+setMethod("first",
+          signature(x = "RDD"),
+          function(x) {
+            take(x, 1)[[1]]
+          })
+
+#' Removes the duplicates from RDD.
+#'
+#' This function returns a new RDD containing the distinct elements in the
+#' given RDD. The same as `distinct()' in Spark.
+#'
+#' @param x The RDD to remove duplicates from.
+#' @param numPartitions Number of partitions to create.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, c(1,2,2,3,3,3))
+#' sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
+#'}
+#' @rdname distinct
+#' @aliases distinct,RDD-method
+setMethod("distinct",
+          signature(x = "RDD"),
+          function(x, numPartitions = SparkR::numPartitions(x)) {
+            identical.mapped <- lapply(x, function(x) { list(x, NULL) })
+            reduced <- reduceByKey(identical.mapped,
+                                   function(x, y) { x },
+                                   numPartitions)
+            resRDD <- lapply(reduced, function(x) { x[[1]] })
+            resRDD
+          })
+
+#' Return an RDD that is a sampled subset of the given RDD.
+#'
+#' The same as `sample()' in Spark. (We rename it due to signature
+#' inconsistencies with the `sample()' function in R's base package.)
+#'
+#' @param x The RDD to sample elements from
+#' @param withReplacement Sampling with replacement or not
+#' @param fraction The (rough) sample target fraction
+#' @param seed Randomness seed value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
+#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
+#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
+#'}
+#' @rdname sampleRDD
+#' @aliases sampleRDD,RDD
+setMethod("sampleRDD",
+          signature(x = "RDD", withReplacement = "logical",
+                    fraction = "numeric", seed = "integer"),
+          function(x, withReplacement, fraction, seed) {
+
+            # The sampler: takes a partition and returns its sampled version.
+            samplingFunc <- function(split, part) {
+              set.seed(seed)
+              res <- vector("list", length(part))
+              len <- 0
+
+              # Discards some random values to ensure each partition has a
+              # different random seed.
+              runif(split)
+
+              for (elem in part) {
+                if (withReplacement) {
+                  count <- rpois(1, fraction)
+                  if (count > 0) {
+                    res[(len + 1):(len + count)] <- rep(list(elem), count)
+                    len <- len + count
+                  }
+                } else {
+                  if (runif(1) < fraction) {
+                    len <- len + 1
+                    res[[len]] <- elem
+                  }
+                }
+              }
+
+              # TODO(zongheng): look into the performance of the current
+              # implementation. Look into some iterator package? Note that
+              # Scala avoids many calls to creating an empty list and PySpark
+              # similarly achieves this using `yield'.
+              if (len > 0)
+                res[1:len]
+              else
+                list()
+            }
+
+            lapplyPartitionsWithIndex(x, samplingFunc)
+          })
+
+#' Return a list of the elements that are a sampled subset of the given RDD.
+#'
+#' @param x The RDD to sample elements from
+#' @param withReplacement Sampling with replacement or not
+#' @param num Number of elements to return
+#' @param seed Randomness seed value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:100)
+#' # exactly 5 elements sampled, which may not be distinct
+#' takeSample(rdd, TRUE, 5L, 1618L)
+#' # exactly 5 distinct elements sampled
+#' takeSample(rdd, FALSE, 5L, 16181618L)
+#'}
+#' @rdname takeSample
+#' @aliases takeSample,RDD
+setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
+                                  num = "integer", seed = "integer"),
+          function(x, withReplacement, num, seed) {
+            # This function is ported from RDD.scala.
+            fraction <- 0.0
+            total <- 0
+            multiplier <- 3.0
+            initialCount <- count(x)
+            maxSelected <- 0
+            MAXINT <- .Machine$integer.max
+
+            if (num < 0)
+              stop(paste("Negative number of elements requested"))
+
+            if (initialCount > MAXINT - 1) {
+              maxSelected <- MAXINT - 1
+            } else {
+              maxSelected <- initialCount
+            }
+
+            if (num > initialCount && !withReplacement) {
+              total <- maxSelected
+              fraction <- multiplier * (maxSelected + 1) / initialCount
+            } else {
+              total <- num
+              fraction <- multiplier * (num + 1) / initialCount
+            }
+
+            set.seed(seed)
+            samples <- collect(sampleRDD(x, withReplacement, fraction,
+                                         as.integer(ceiling(runif(1,
+                                                                  -MAXINT,
+                                                                  MAXINT)))))
+            # If the first sample didn't turn out large enough, keep trying to
+            # take samples; this shouldn't happen often because we use a big
+            # multiplier for thei initial size
+            while (length(samples) < total)
+              samples <- collect(sampleRDD(x, withReplacement, fraction,
+                                           as.integer(ceiling(runif(1,
+                                                                    -MAXINT,
+                                                                    MAXINT)))))
+
+            # TODO(zongheng): investigate if this call is an in-place shuffle?
+            sample(samples)[1:total]
+          })
+
+#' Creates tuples of the elements in this RDD by applying a function.
+#'
+#' @param x The RDD.
+#' @param func The function to be applied.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3))
+#' collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
+#'}
+#' @rdname keyBy
+#' @aliases keyBy,RDD
+setMethod("keyBy",
+          signature(x = "RDD", func = "function"),
+          function(x, func) {
+            apply.func <- function(x) {
+              list(func(x), x)
+            }
+            lapply(x, apply.func)
+          })
+
+#' Return a new RDD that has exactly numPartitions partitions.
+#' Can increase or decrease the level of parallelism in this RDD. Internally,
+#' this uses a shuffle to redistribute data.
+#' If you are decreasing the number of partitions in this RDD, consider using
+#' coalesce, which can avoid performing a shuffle.
+#'
+#' @param x The RDD.
+#' @param numPartitions Number of partitions to create.
+#' @seealso coalesce
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
+#' numPartitions(rdd)                   # 4
+#' numPartitions(repartition(rdd, 2L))  # 2
+#'}
+#' @rdname repartition
+#' @aliases repartition,RDD
+setMethod("repartition",
+          signature(x = "RDD", numPartitions = "numeric"),
+          function(x, numPartitions) {
+            coalesce(x, numToInt(numPartitions), TRUE)
+          })
+
+#' Return a new RDD that is reduced into numPartitions partitions.
+#'
+#' @param x The RDD.
+#' @param numPartitions Number of partitions to create.
+#' @seealso repartition
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
+#' numPartitions(rdd)               # 3
+#' numPartitions(coalesce(rdd, 1L)) # 1
+#'}
+#' @rdname coalesce
+#' @aliases coalesce,RDD
+setMethod("coalesce",
+           signature(x = "RDD", numPartitions = "numeric"),
+           function(x, numPartitions, shuffle = FALSE) {
+             numPartitions <- numToInt(numPartitions)
+             if (shuffle || numPartitions > SparkR::numPartitions(x)) {
+               func <- function(s, part) {
+                 set.seed(s)  # split as seed
+                 start <- as.integer(sample(numPartitions, 1) - 1)
+                 lapply(seq_along(part),
+                        function(i) {
+                          pos <- (start + i) %% numPartitions
+                          list(pos, part[[i]])
+                        })
+               }
+               shuffled <- lapplyPartitionsWithIndex(x, func)
+               repartitioned <- partitionBy(shuffled, numPartitions)
+               values(repartitioned)
+             } else {
+               jrdd <- callJMethod(getJRDD(x), "coalesce", numPartitions, shuffle)
+               RDD(jrdd)
+             }
+           })
+
+#' Save this RDD as a SequenceFile of serialized objects.
+#'
+#' @param x The RDD to save
+#' @param path The directory where the file is saved
+#' @seealso objectFile
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3)
+#' saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
+#'}
+#' @rdname saveAsObjectFile
+#' @aliases saveAsObjectFile,RDD
+setMethod("saveAsObjectFile",
+          signature(x = "RDD", path = "character"),
+          function(x, path) {
+            # If serializedMode == "string" we need to serialize the data before saving it since
+            # objectFile() assumes serializedMode == "byte".
+            if (getSerializedMode(x) != "byte") {
+              x <- serializeToBytes(x)
+            }
+            # Return nothing
+            invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path))
+          })
+
+#' Save this RDD as a text file, using string representations of elements.
+#'
+#' @param x The RDD to save
+#' @param path The directory where the splits of the text file are saved
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3)
+#' saveAsTextFile(rdd, "/tmp/sparkR-tmp")
+#'}
+#' @rdname saveAsTextFile
+#' @aliases saveAsTextFile,RDD
+setMethod("saveAsTextFile",
+          signature(x = "RDD", path = "character"),
+          function(x, path) {
+            func <- function(str) {
+              toString(str)
+            }
+            stringRdd <- lapply(x, func)
+            # Return nothing
+            invisible(
+              callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path))
+          })
+
+#' Sort an RDD by the given key function.
+#'
+#' @param x An RDD to be sorted.
+#' @param func A function used to compute the sort key for each element.
+#' @param ascending A flag to indicate whether the sorting is ascending or descending.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where all elements are sorted.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(3, 2, 1))
+#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
+#'}
+#' @rdname sortBy
+#' @aliases sortBy,RDD,RDD-method
+setMethod("sortBy",
+          signature(x = "RDD", func = "function"),
+          function(x, func, ascending = TRUE, numPartitions = SparkR::numPartitions(x)) {          
+            values(sortByKey(keyBy(x, func), ascending, numPartitions))
+          })
+
+# Helper function to get first N elements from an RDD in the specified order.
+# Param:
+#   x An RDD.
+#   num Number of elements to return.
+#   ascending A flag to indicate whether the sorting is ascending or descending.
+# Return:
+#   A list of the first N elements from the RDD in the specified order.
+#
+takeOrderedElem <- function(x, num, ascending = TRUE) {          
+  if (num <= 0L) {
+    return(list())
+  }
+  
+  partitionFunc <- function(part) {
+    if (num < length(part)) {
+      # R limitation: order works only on primitive types!
+      ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
+      list(part[ord[1:num]])
+    } else {
+      list(part)
+    }
+  }
+
+  reduceFunc <- function(elems, part) {
+    newElems <- append(elems, part)
+    # R limitation: order works only on primitive types!
+    ord <- order(unlist(newElems, recursive = FALSE), decreasing = !ascending)
+    newElems[ord[1:num]]
+  }
+  
+  newRdd <- mapPartitions(x, partitionFunc)
+  reduce(newRdd, reduceFunc)
+}
+
+#' Returns the first N elements from an RDD in ascending order.
+#'
+#' @param x An RDD.
+#' @param num Number of elements to return.
+#' @return The first N elements from the RDD in ascending order.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
+#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
+#'}
+#' @rdname takeOrdered
+#' @aliases takeOrdered,RDD,RDD-method
+setMethod("takeOrdered",
+          signature(x = "RDD", num = "integer"),
+          function(x, num) {          
+            takeOrderedElem(x, num)
+          })
+
+#' Returns the top N elements from an RDD.
+#'
+#' @param x An RDD.
+#' @param num Number of elements to return.
+#' @return The top N elements from the RDD.
+#' @rdname top
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
+#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
+#'}
+#' @rdname top
+#' @aliases top,RDD,RDD-method
+setMethod("top",
+          signature(x = "RDD", num = "integer"),
+          function(x, num) {          
+            takeOrderedElem(x, num, FALSE)
+          })
+
+#' Fold an RDD using a given associative function and a neutral "zero value".
+#'
+#' Aggregate the elements of each partition, and then the results for all the
+#' partitions, using a given associative function and a neutral "zero value".
+#' 
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param op An associative function for the folding operation.
+#' @return The folding result.
+#' @rdname fold
+#' @seealso reduce
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
+#' fold(rdd, 0, "+") # 15
+#'}
+#' @rdname fold
+#' @aliases fold,RDD,RDD-method
+setMethod("fold",
+          signature(x = "RDD", zeroValue = "ANY", op = "ANY"),
+          function(x, zeroValue, op) {
+            aggregateRDD(x, zeroValue, op, op)
+          })
+
+#' Aggregate an RDD using the given combine functions and a neutral "zero value".
+#'
+#' Aggregate the elements of each partition, and then the results for all the
+#' partitions, using given combine functions and a neutral "zero value".
+#' 
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param seqOp A function to aggregate the RDD elements. It may return a different
+#'              result type from the type of the RDD elements.
+#' @param combOp A function to aggregate results of seqOp.
+#' @return The aggregation result.
+#' @rdname aggregateRDD
+#' @seealso reduce
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4))
+#' zeroValue <- list(0, 0)
+#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
+#'}
+#' @rdname aggregateRDD
+#' @aliases aggregateRDD,RDD,RDD-method
+setMethod("aggregateRDD",
+          signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
+          function(x, zeroValue, seqOp, combOp) {        
+            partitionFunc <- function(part) {
+              Reduce(seqOp, part, zeroValue)
+            }
+            
+            partitionList <- collect(lapplyPartition(x, partitionFunc),
+                                     flatten = FALSE)
+            Reduce(combOp, partitionList, zeroValue)
+          })
+
+#' Pipes elements to a forked external process.
+#'
+#' The same as 'pipe()' in Spark.
+#'
+#' @param x The RDD whose elements are piped to the forked external process.
+#' @param command The command to fork an external process.
+#' @param env A named list to set environment variables of the external process.
+#' @return A new RDD created by piping all elements to a forked external process.
+#' @rdname pipeRDD
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' collect(pipeRDD(rdd, "more")
+#' Output: c("1", "2", ..., "10")
+#'}
+#' @rdname pipeRDD
+#' @aliases pipeRDD,RDD,character-method
+setMethod("pipeRDD",
+          signature(x = "RDD", command = "character"),
+          function(x, command, env = list()) {
+            func <- function(part) {
+              trim.trailing.func <- function(x) {
+                sub("[\r\n]*$", "", toString(x))
+              }
+              input <- unlist(lapply(part, trim.trailing.func))
+              res <- system2(command, stdout = TRUE, input = input, env = env)
+              lapply(res, trim.trailing.func)
+            }
+            lapplyPartition(x, func)
+          })
+
+# TODO: Consider caching the name in the RDD's environment
+#' Return an RDD's name.
+#'
+#' @param x The RDD whose name is returned.
+#' @rdname name
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1,2,3))
+#' name(rdd) # NULL (if not set before)
+#'}
+#' @rdname name
+#' @aliases name,RDD
+setMethod("name",
+          signature(x = "RDD"),
+          function(x) {
+            callJMethod(getJRDD(x), "name")
+          })
+
+#' Set an RDD's name.
+#'
+#' @param x The RDD whose name is to be set.
+#' @param name The RDD name to be set.
+#' @return a new RDD renamed.
+#' @rdname setName
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1,2,3))
+#' setName(rdd, "myRDD")
+#' name(rdd) # "myRDD"
+#'}
+#' @rdname setName
+#' @aliases setName,RDD
+setMethod("setName",
+          signature(x = "RDD", name = "character"),
+          function(x, name) {
+            callJMethod(getJRDD(x), "setName", name)
+            x
+          })
+
+#' Zip an RDD with generated unique Long IDs.
+#'
+#' Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
+#' n is the number of partitions. So there may exist gaps, but this
+#' method won't trigger a spark job, which is different from
+#' zipWithIndex.
+#'
+#' @param x An RDD to be zipped.
+#' @return An RDD with zipped items.
+#' @seealso zipWithIndex
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+#' collect(zipWithUniqueId(rdd)) 
+#' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
+#'}
+#' @rdname zipWithUniqueId
+#' @aliases zipWithUniqueId,RDD
+setMethod("zipWithUniqueId",
+          signature(x = "RDD"),
+          function(x) {
+            n <- numPartitions(x)
+
+            partitionFunc <- function(split, part) {
+              mapply(
+                function(item, index) {
+                  list(item, (index - 1) * n + split)
+                },
+                part,
+                seq_along(part),
+                SIMPLIFY = FALSE)
+            }
+
+            lapplyPartitionsWithIndex(x, partitionFunc)
+          })
+
+#' Zip an RDD with its element indices.
+#'
+#' The ordering is first based on the partition index and then the
+#' ordering of items within each partition. So the first item in
+#' the first partition gets index 0, and the last item in the last
+#' partition receives the largest index.
+#'
+#' This method needs to trigger a Spark job when this RDD contains
+#' more than one partition.
+#'
+#' @param x An RDD to be zipped.
+#' @return An RDD with zipped items.
+#' @seealso zipWithUniqueId
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+#' collect(zipWithIndex(rdd))
+#' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
+#'}
+#' @rdname zipWithIndex
+#' @aliases zipWithIndex,RDD
+setMethod("zipWithIndex",
+          signature(x = "RDD"),
+          function(x) {
+            n <- numPartitions(x)
+            if (n > 1) {
+              nums <- collect(lapplyPartition(x,
+                                              function(part) {
+                                                list(length(part))
+                                              }))
+              startIndices <- Reduce("+", nums, accumulate = TRUE)
+            }
+
+            partitionFunc <- function(split, part) {
+              if (split == 0) {
+                startIndex <- 0
+              } else {
+                startIndex <- startIndices[[split]]
+              }
+
+              mapply(
+                function(item, index) {
+                  list(item, index - 1 + startIndex)
+                },
+                part,
+                seq_along(part),
+                SIMPLIFY = FALSE)
+           }
+
+           lapplyPartitionsWithIndex(x, partitionFunc)
+         })
+
+#' Coalesce all elements within each partition of an RDD into a list.
+#'
+#' @param x An RDD.
+#' @return An RDD created by coalescing all elements within
+#'         each partition into a list.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, as.list(1:4), 2L)
+#' collect(glom(rdd))
+#' # list(list(1, 2), list(3, 4))
+#'}
+#' @rdname glom
+#' @aliases glom,RDD
+setMethod("glom",
+          signature(x = "RDD"),
+          function(x) {
+            partitionFunc <- function(part) {
+              list(part)
+            }
+            
+            lapplyPartition(x, partitionFunc)
+          })
+
+############ Binary Functions #############
+
+#' Return the union RDD of two RDDs.
+#' The same as union() in Spark.
+#'
+#' @param x An RDD.
+#' @param y An RDD.
+#' @return a new RDD created by performing the simple union (witout removing
+#' duplicates) of two input RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3)
+#' unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
+#'}
+#' @rdname unionRDD
+#' @aliases unionRDD,RDD,RDD-method
+setMethod("unionRDD",
+          signature(x = "RDD", y = "RDD"),
+          function(x, y) {
+            if (getSerializedMode(x) == getSerializedMode(y)) {
+              jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
+              union.rdd <- RDD(jrdd, getSerializedMode(x))
+            } else {
+              # One of the RDDs is not serialized, we need to serialize it first.
+              if (getSerializedMode(x) != "byte") x <- serializeToBytes(x)
+              if (getSerializedMode(y) != "byte") y <- serializeToBytes(y)
+              jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
+              union.rdd <- RDD(jrdd, "byte")
+            }
+            union.rdd
+          })
+
+#' Zip an RDD with another RDD.
+#'
+#' Zips this RDD with another one, returning key-value pairs with the
+#' first element in each RDD second element in each RDD, etc. Assumes
+#' that the two RDDs have the same number of partitions and the same
+#' number of elements in each partition (e.g. one was made through
+#' a map on the other).
+#'
+#' @param x An RDD to be zipped.
+#' @param other Another RDD to be zipped.
+#' @return An RDD zipped from the two RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, 0:4)
+#' rdd2 <- parallelize(sc, 1000:1004)
+#' collect(zipRDD(rdd1, rdd2))
+#' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
+#'}
+#' @rdname zipRDD
+#' @aliases zipRDD,RDD
+setMethod("zipRDD",
+          signature(x = "RDD", other = "RDD"),
+          function(x, other) {
+            n1 <- numPartitions(x)
+            n2 <- numPartitions(other)
+            if (n1 != n2) {
+              stop("Can only zip RDDs which have the same number of partitions.")
+            }
+
+            if (getSerializedMode(x) != getSerializedMode(other) || 
+                getSerializedMode(x) == "byte") {
+              # Append the number of elements in each partition to that partition so that we can later
+              # check if corresponding partitions of both RDDs have the same number of elements.
+              #
+              # Note that this appending also serves the purpose of reserialization, because even if 
+              # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
+              # as a single byte array. For example, partitions of an RDD generated from partitionBy()
+              # may be encoded as multiple byte arrays.          
+              appendLength <- function(part) {
+                part[[length(part) + 1]] <- length(part) + 1
+                part
+              }
+              x <- lapplyPartition(x, appendLength)
+              other <- lapplyPartition(other, appendLength)
+            }
+            
+            zippedJRDD <- callJMethod(getJRDD(x), "zip", getJRDD(other))
+            # The zippedRDD's elements are of scala Tuple2 type. The serialized
+            # flag Here is used for the elements inside the tuples.
+            serializerMode <- getSerializedMode(x)
+            zippedRDD <- RDD(zippedJRDD, serializerMode)
+            
+            partitionFunc <- function(split, part) {
+              len <- length(part)
+              if (len > 0) {
+                if (serializerMode == "byte") {
+                  lengthOfValues <- part[[len]]
+                  lengthOfKeys <- part[[len - lengthOfValues]]
+                  stopifnot(len == lengthOfKeys + lengthOfValues)
+                  
+                  # check if corresponding partitions of both RDDs have the same number of elements.
+                  if (lengthOfKeys != lengthOfValues) {
+                    stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
+                  }
+                  
+                  if (lengthOfKeys > 1) {
+                    keys <- part[1 : (lengthOfKeys - 1)]
+                    values <- part[(lengthOfKeys + 1) : (len - 1)]                    
+                  } else {
+                    keys <- list()
+                    values <- list()
+                  }
+                } else {
+                  # Keys, values must have same length here, because this has
+                  # been validated inside the JavaRDD.zip() function.
+                  keys <- part[c(TRUE, FALSE)]
+                  values <- part[c(FALSE, TRUE)]
+                }
+                mapply(
+                    function(k, v) {
+                      list(k, v)
+                    },
+                    keys,
+                    values,
+                    SIMPLIFY = FALSE,
+                    USE.NAMES = FALSE)
+              } else {
+                part
+              }
+            }
+            
+            PipelinedRDD(zippedRDD, partitionFunc)
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
new file mode 100644
index 0000000..930ada2
--- /dev/null
+++ b/R/pkg/R/SQLContext.R
@@ -0,0 +1,520 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# SQLcontext.R: SQLContext-driven functions
+
+#' infer the SQL type
+infer_type <- function(x) {
+  if (is.null(x)) {
+    stop("can not infer type from NULL")
+  }
+
+  # class of POSIXlt is c("POSIXlt" "POSIXt")
+  type <- switch(class(x)[[1]],
+                 integer = "integer",
+                 character = "string",
+                 logical = "boolean",
+                 double = "double",
+                 numeric = "double",
+                 raw = "binary",
+                 list = "array",
+                 environment = "map",
+                 Date = "date",
+                 POSIXlt = "timestamp",
+                 POSIXct = "timestamp",
+                 stop(paste("Unsupported type for DataFrame:", class(x))))
+
+  if (type == "map") {
+    stopifnot(length(x) > 0)
+    key <- ls(x)[[1]]
+    list(type = "map",
+         keyType = "string",
+         valueType = infer_type(get(key, x)),
+         valueContainsNull = TRUE)
+  } else if (type == "array") {
+    stopifnot(length(x) > 0)
+    names <- names(x)
+    if (is.null(names)) {
+      list(type = "array", elementType = infer_type(x[[1]]), containsNull = TRUE)
+    } else {
+      # StructType
+      types <- lapply(x, infer_type)
+      fields <- lapply(1:length(x), function(i) {
+        list(name = names[[i]], type = types[[i]], nullable = TRUE)
+      })
+      list(type = "struct", fields = fields)
+    }
+  } else if (length(x) > 1) {
+    list(type = "array", elementType = type, containsNull = TRUE)
+  } else {
+    type
+  }
+}
+
+#' dump the schema into JSON string
+tojson <- function(x) {
+  if (is.list(x)) {
+    names <- names(x)
+    if (!is.null(names)) {
+      items <- lapply(names, function(n) {
+        safe_n <- gsub('"', '\\"', n)
+        paste(tojson(safe_n), ':', tojson(x[[n]]), sep = '')
+      })
+      d <- paste(items, collapse = ', ')
+      paste('{', d, '}', sep = '')
+    } else {
+      l <- paste(lapply(x, tojson), collapse = ', ')
+      paste('[', l, ']', sep = '')
+    }
+  } else if (is.character(x)) {
+    paste('"', x, '"', sep = '')
+  } else if (is.logical(x)) {
+    if (x) "true" else "false"
+  } else {
+    stop(paste("unexpected type:", class(x)))
+  }
+}
+
+#' Create a DataFrame from an RDD
+#'
+#' Converts an RDD to a DataFrame by infer the types.
+#'
+#' @param sqlCtx A SQLContext
+#' @param data An RDD or list or data.frame
+#' @param schema a list of column names or named list (StructType), optional
+#' @return an DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
+#' df <- createDataFrame(sqlCtx, rdd)
+#' }
+
+# TODO(davies): support sampling and infer type from NA
+createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
+  if (is.data.frame(data)) {
+      # get the names of columns, they will be put into RDD
+      schema <- names(data)
+      n <- nrow(data)
+      m <- ncol(data)
+      # get rid of factor type
+      dropFactor <- function(x) {
+        if (is.factor(x)) {
+          as.character(x)
+        } else {
+          x
+        }
+      }
+      data <- lapply(1:n, function(i) {
+        lapply(1:m, function(j) { dropFactor(data[i,j]) })
+      })
+  }
+  if (is.list(data)) {
+    sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlCtx)
+    rdd <- parallelize(sc, data)
+  } else if (inherits(data, "RDD")) {
+    rdd <- data
+  } else {
+    stop(paste("unexpected type:", class(data)))
+  }
+
+  if (is.null(schema) || is.null(names(schema))) {
+    row <- first(rdd)
+    names <- if (is.null(schema)) {
+      names(row)
+    } else {
+      as.list(schema)
+    }
+    if (is.null(names)) {
+      names <- lapply(1:length(row), function(x) {
+       paste("_", as.character(x), sep = "")
+      })
+    }
+
+    # SPAKR-SQL does not support '.' in column name, so replace it with '_'
+    # TODO(davies): remove this once SPARK-2775 is fixed
+    names <- lapply(names, function(n) {
+      nn <- gsub("[.]", "_", n)
+      if (nn != n) {
+        warning(paste("Use", nn, "instead of", n, " as column name"))
+      }
+      nn
+    })
+
+    types <- lapply(row, infer_type)
+    fields <- lapply(1:length(row), function(i) {
+      list(name = names[[i]], type = types[[i]], nullable = TRUE)
+    })
+    schema <- list(type = "struct", fields = fields)
+  }
+
+  stopifnot(class(schema) == "list")
+  stopifnot(schema$type == "struct")
+  stopifnot(class(schema$fields) == "list")
+  schemaString <- tojson(schema)
+
+  jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
+  srdd <- callJMethod(jrdd, "rdd")
+  sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
+                     srdd, schemaString, sqlCtx)
+  dataFrame(sdf)
+}
+
+#' toDF
+#'
+#' Converts an RDD to a DataFrame by infer the types.
+#'
+#' @param x An RDD
+#'
+#' @rdname DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
+#' df <- toDF(rdd)
+#' }
+
+setGeneric("toDF", function(x, ...) { standardGeneric("toDF") })
+
+setMethod("toDF", signature(x = "RDD"),
+          function(x, ...) {
+            sqlCtx <- if (exists(".sparkRHivesc", envir = .sparkREnv)) {
+              get(".sparkRHivesc", envir = .sparkREnv)
+            } else if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
+              get(".sparkRSQLsc", envir = .sparkREnv)
+            } else {
+              stop("no SQL context available")
+            }
+            createDataFrame(sqlCtx, x, ...)
+          })
+
+#' Create a DataFrame from a JSON file.
+#'
+#' Loads a JSON file (one object per line), returning the result as a DataFrame 
+#' It goes through the entire dataset once to determine the schema.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' }
+
+jsonFile <- function(sqlCtx, path) {
+  # Allow the user to have a more flexible definiton of the text file path
+  path <- normalizePath(path)
+  # Convert a string vector of paths to a string containing comma separated paths
+  path <- paste(path, collapse = ",")
+  sdf <- callJMethod(sqlCtx, "jsonFile", path)
+  dataFrame(sdf)
+}
+
+
+#' JSON RDD
+#'
+#' Loads an RDD storing one JSON object per string as a DataFrame.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param rdd An RDD of JSON string
+#' @param schema A StructType object to use as schema
+#' @param samplingRatio The ratio of simpling used to infer the schema
+#' @return A DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' rdd <- texFile(sc, "path/to/json")
+#' df <- jsonRDD(sqlCtx, rdd)
+#' }
+
+# TODO: support schema
+jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) {
+  rdd <- serializeToString(rdd)
+  if (is.null(schema)) {
+    sdf <- callJMethod(sqlCtx, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio)
+    dataFrame(sdf)
+  } else {
+    stop("not implemented")
+  }
+}
+
+
+#' Create a DataFrame from a Parquet file.
+#' 
+#' Loads a Parquet file, returning the result as a DataFrame.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param ... Path(s) of parquet file(s) to read.
+#' @return DataFrame
+#' @export
+
+# TODO: Implement saveasParquetFile and write examples for both
+parquetFile <- function(sqlCtx, ...) {
+  # Allow the user to have a more flexible definiton of the text file path
+  paths <- lapply(list(...), normalizePath)
+  sdf <- callJMethod(sqlCtx, "parquetFile", paths)
+  dataFrame(sdf)
+}
+
+#' SQL Query
+#' 
+#' Executes a SQL query using Spark, returning the result as a DataFrame.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param sqlQuery A character vector containing the SQL query
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' registerTempTable(df, "table")
+#' new_df <- sql(sqlCtx, "SELECT * FROM table")
+#' }
+
+sql <- function(sqlCtx, sqlQuery) {
+  sdf <- callJMethod(sqlCtx, "sql", sqlQuery)
+  dataFrame(sdf)
+}
+
+
+#' Create a DataFrame from a SparkSQL Table
+#' 
+#' Returns the specified Table as a DataFrame.  The Table must have already been registered
+#' in the SQLContext.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName The SparkSQL Table to convert to a DataFrame.
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' registerTempTable(df, "table")
+#' new_df <- table(sqlCtx, "table")
+#' }
+
+table <- function(sqlCtx, tableName) {
+  sdf <- callJMethod(sqlCtx, "table", tableName)
+  dataFrame(sdf) 
+}
+
+
+#' Tables
+#'
+#' Returns a DataFrame containing names of tables in the given database.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param databaseName name of the database
+#' @return a DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' tables(sqlCtx, "hive")
+#' }
+
+tables <- function(sqlCtx, databaseName = NULL) {
+  jdf <- if (is.null(databaseName)) {
+    callJMethod(sqlCtx, "tables")
+  } else {
+    callJMethod(sqlCtx, "tables", databaseName)
+  }
+  dataFrame(jdf)
+}
+
+
+#' Table Names
+#'
+#' Returns the names of tables in the given database as an array.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param databaseName name of the database
+#' @return a list of table names
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' tableNames(sqlCtx, "hive")
+#' }
+
+tableNames <- function(sqlCtx, databaseName = NULL) {
+  if (is.null(databaseName)) {
+    callJMethod(sqlCtx, "tableNames")
+  } else {
+    callJMethod(sqlCtx, "tableNames", databaseName)
+  }
+}
+
+
+#' Cache Table
+#' 
+#' Caches the specified table in-memory.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName The name of the table being cached
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' registerTempTable(df, "table")
+#' cacheTable(sqlCtx, "table")
+#' }
+
+cacheTable <- function(sqlCtx, tableName) {
+  callJMethod(sqlCtx, "cacheTable", tableName)  
+}
+
+#' Uncache Table
+#' 
+#' Removes the specified table from the in-memory cache.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName The name of the table being uncached
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' registerTempTable(df, "table")
+#' uncacheTable(sqlCtx, "table")
+#' }
+
+uncacheTable <- function(sqlCtx, tableName) {
+  callJMethod(sqlCtx, "uncacheTable", tableName)
+}
+
+#' Clear Cache
+#'
+#' Removes all cached tables from the in-memory cache.
+#'
+#' @param sqlCtx SQLContext to use
+#' @examples
+#' \dontrun{
+#' clearCache(sqlCtx)
+#' }
+
+clearCache <- function(sqlCtx) {
+  callJMethod(sqlCtx, "clearCache")
+}
+
+#' Drop Temporary Table
+#'
+#' Drops the temporary table with the given table name in the catalog.
+#' If the table has been cached/persisted before, it's also unpersisted.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName The name of the SparkSQL table to be dropped.
+#' @examples
+#' \dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df <- loadDF(sqlCtx, path, "parquet")
+#' registerTempTable(df, "table")
+#' dropTempTable(sqlCtx, "table")
+#' }
+
+dropTempTable <- function(sqlCtx, tableName) {
+  if (class(tableName) != "character") {
+    stop("tableName must be a string.")
+  }
+  callJMethod(sqlCtx, "dropTempTable", tableName)
+}
+
+#' Load an DataFrame
+#'
+#' Returns the dataset in a data source as a DataFrame
+#'
+#' The data source is specified by the `source` and a set of options(...).
+#' If `source` is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param path The path of files to load
+#' @param source the name of external data source
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df <- load(sqlCtx, "path/to/file.json", source = "json")
+#' }
+
+loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
+  options <- varargsToEnv(...)
+  if (!is.null(path)) {
+    options[['path']] <- path
+  }
+  sdf <- callJMethod(sqlCtx, "load", source, options)
+  dataFrame(sdf)
+}
+
+#' Create an external table
+#'
+#' Creates an external table based on the dataset in a data source,
+#' Returns the DataFrame associated with the external table.
+#'
+#' The data source is specified by the `source` and a set of options(...).
+#' If `source` is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName A name of the table
+#' @param path The path of files to load
+#' @param source the name of external data source
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df <- sparkRSQL.createExternalTable(sqlCtx, "myjson", path="path/to/json", source="json")
+#' }
+
+createExternalTable <- function(sqlCtx, tableName, path = NULL, source = NULL, ...) {
+  options <- varargsToEnv(...)
+  if (!is.null(path)) {
+    options[['path']] <- path
+  }
+  sdf <- callJMethod(sqlCtx, "createExternalTable", tableName, source, options)
+  dataFrame(sdf)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/SQLTypes.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLTypes.R b/R/pkg/R/SQLTypes.R
new file mode 100644
index 0000000..962fba5
--- /dev/null
+++ b/R/pkg/R/SQLTypes.R
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Utility functions for handling SparkSQL DataTypes.
+
+# Handler for StructType
+structType <- function(st) {
+  obj <- structure(new.env(parent = emptyenv()), class = "structType")
+  obj$jobj <- st
+  obj$fields <- function() { lapply(callJMethod(st, "fields"), structField) }
+  obj
+}
+
+#' Print a Spark StructType.
+#'
+#' This function prints the contents of a StructType returned from the
+#' SparkR JVM backend.
+#'
+#' @param x A StructType object
+#' @param ... further arguments passed to or from other methods
+print.structType <- function(x, ...) {
+  fieldsList <- lapply(x$fields(), function(i) { i$print() })
+  print(fieldsList)
+}
+
+# Handler for StructField
+structField <- function(sf) {
+  obj <- structure(new.env(parent = emptyenv()), class = "structField")
+  obj$jobj <- sf
+  obj$name <- function() { callJMethod(sf, "name") }
+  obj$dataType <- function() { callJMethod(sf, "dataType") }
+  obj$dataType.toString <- function() { callJMethod(obj$dataType(), "toString") }
+  obj$dataType.simpleString <- function() { callJMethod(obj$dataType(), "simpleString") }
+  obj$nullable <- function() { callJMethod(sf, "nullable") }
+  obj$print <- function() { paste("StructField(", 
+                     paste(obj$name(), obj$dataType.toString(), obj$nullable(), sep = ", "),
+                     ")", sep = "") }
+  obj
+}
+
+#' Print a Spark StructField.
+#'
+#' This function prints the contents of a StructField returned from the
+#' SparkR JVM backend.
+#'
+#' @param x A StructField object
+#' @param ... further arguments passed to or from other methods
+print.structField <- function(x, ...) {
+  cat(x$print())
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/backend.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/backend.R b/R/pkg/R/backend.R
new file mode 100644
index 0000000..2fb6fae
--- /dev/null
+++ b/R/pkg/R/backend.R
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Methods to call into SparkRBackend.
+
+
+# Returns TRUE if object is an instance of given class
+isInstanceOf <- function(jobj, className) {
+  stopifnot(class(jobj) == "jobj")
+  cls <- callJStatic("java.lang.Class", "forName", className)
+  callJMethod(cls, "isInstance", jobj)
+}
+
+# Call a Java method named methodName on the object
+# specified by objId. objId should be a "jobj" returned
+# from the SparkRBackend.
+callJMethod <- function(objId, methodName, ...) {
+  stopifnot(class(objId) == "jobj")
+  if (!isValidJobj(objId)) {
+    stop("Invalid jobj ", objId$id,
+         ". If SparkR was restarted, Spark operations need to be re-executed.")
+  }
+  invokeJava(isStatic = FALSE, objId$id, methodName, ...)
+}
+
+# Call a static method on a specified className
+callJStatic <- function(className, methodName, ...) {
+  invokeJava(isStatic = TRUE, className, methodName, ...)
+}
+
+# Create a new object of the specified class name
+newJObject <- function(className, ...) {
+  invokeJava(isStatic = TRUE, className, methodName = "<init>", ...)
+}
+
+# Remove an object from the SparkR backend. This is done
+# automatically when a jobj is garbage collected.
+removeJObject <- function(objId) {
+  invokeJava(isStatic = TRUE, "SparkRHandler", "rm", objId)
+}
+
+isRemoveMethod <- function(isStatic, objId, methodName) {
+  isStatic == TRUE && objId == "SparkRHandler" && methodName == "rm"
+}
+
+# Invoke a Java method on the SparkR backend. Users
+# should typically use one of the higher level methods like
+# callJMethod, callJStatic etc. instead of using this.
+#
+# isStatic - TRUE if the method to be called is static
+# objId - String that refers to the object on which method is invoked
+#         Should be a jobj id for non-static methods and the classname
+#         for static methods
+# methodName - name of method to be invoked
+invokeJava <- function(isStatic, objId, methodName, ...) {
+  if (!exists(".sparkRCon", .sparkREnv)) {
+    stop("No connection to backend found. Please re-run sparkR.init")
+  }
+
+  # If this isn't a removeJObject call
+  if (!isRemoveMethod(isStatic, objId, methodName)) {
+    objsToRemove <- ls(.toRemoveJobjs)
+    if (length(objsToRemove) > 0) {
+      sapply(objsToRemove,
+            function(e) {
+              removeJObject(e)
+            })
+      rm(list = objsToRemove, envir = .toRemoveJobjs)
+    }
+  }
+
+
+  rc <- rawConnection(raw(0), "r+")
+
+  writeBoolean(rc, isStatic)
+  writeString(rc, objId)
+  writeString(rc, methodName)
+
+  args <- list(...)
+  writeInt(rc, length(args))
+  writeArgs(rc, args)
+
+  # Construct the whole request message to send it once,
+  # avoiding write-write-read pattern in case of Nagle's algorithm.
+  # Refer to http://en.wikipedia.org/wiki/Nagle%27s_algorithm for the details.
+  bytesToSend <- rawConnectionValue(rc)
+  close(rc)
+  rc <- rawConnection(raw(0), "r+")
+  writeInt(rc, length(bytesToSend))
+  writeBin(bytesToSend, rc)
+  requestMessage <- rawConnectionValue(rc)
+  close(rc)
+
+  conn <- get(".sparkRCon", .sparkREnv)
+  writeBin(requestMessage, conn)
+
+  # TODO: check the status code to output error information
+  returnStatus <- readInt(conn)
+  stopifnot(returnStatus == 0)
+  readObject(conn)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/broadcast.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/broadcast.R b/R/pkg/R/broadcast.R
new file mode 100644
index 0000000..583fa2e
--- /dev/null
+++ b/R/pkg/R/broadcast.R
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# S4 class representing Broadcast variables
+
+# Hidden environment that holds values for broadcast variables
+# This will not be serialized / shipped by default
+.broadcastNames <- new.env()
+.broadcastValues <- new.env()
+.broadcastIdToName <- new.env()
+
+#' @title S4 class that represents a Broadcast variable
+#' @description Broadcast variables can be created using the broadcast
+#'              function from a \code{SparkContext}.
+#' @rdname broadcast-class
+#' @seealso broadcast 
+#'
+#' @param id Id of the backing Spark broadcast variable 
+#' @export
+setClass("Broadcast", slots = list(id = "character"))
+
+#' @rdname broadcast-class
+#' @param value Value of the broadcast variable
+#' @param jBroadcastRef reference to the backing Java broadcast object
+#' @param objName name of broadcasted object
+#' @export
+Broadcast <- function(id, value, jBroadcastRef, objName) {
+  .broadcastValues[[id]] <- value
+  .broadcastNames[[as.character(objName)]] <- jBroadcastRef
+  .broadcastIdToName[[id]] <- as.character(objName)
+  new("Broadcast", id = id)
+}
+
+#' @description
+#' \code{value} can be used to get the value of a broadcast variable inside
+#' a distributed function.
+#'
+#' @param bcast The broadcast variable to get
+#' @rdname broadcast
+#' @aliases value,Broadcast-method
+setMethod("value",
+          signature(bcast = "Broadcast"),
+          function(bcast) {
+            if (exists(bcast@id, envir = .broadcastValues)) {
+              get(bcast@id, envir = .broadcastValues)
+            } else {
+              NULL
+            }
+          })
+
+#' Internal function to set values of a broadcast variable.
+#'
+#' This function is used internally by Spark to set the value of a broadcast
+#' variable on workers. Not intended for use outside the package.
+#'
+#' @rdname broadcast-internal
+#' @seealso broadcast, value 
+
+#' @param bcastId The id of broadcast variable to set
+#' @param value The value to be set
+#' @export
+setBroadcastValue <- function(bcastId, value) {
+  bcastIdStr <- as.character(bcastId)
+  .broadcastValues[[bcastIdStr]] <- value
+}
+
+#' Helper function to clear the list of broadcast variables we know about
+#' Should be called when the SparkR JVM backend is shutdown
+clearBroadcastVariables <- function() {
+  bcasts <- ls(.broadcastNames)
+  rm(list = bcasts, envir = .broadcastNames)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/client.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
new file mode 100644
index 0000000..1281c41
--- /dev/null
+++ b/R/pkg/R/client.R
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Client code to connect to SparkRBackend
+
+# Creates a SparkR client connection object
+# if one doesn't already exist
+connectBackend <- function(hostname, port, timeout = 6000) {
+  if (exists(".sparkRcon", envir = .sparkREnv)) {
+    if (isOpen(.sparkREnv[[".sparkRCon"]])) {
+      cat("SparkRBackend client connection already exists\n")
+      return(get(".sparkRcon", envir = .sparkREnv))
+    }
+  }
+
+  con <- socketConnection(host = hostname, port = port, server = FALSE,
+                          blocking = TRUE, open = "wb", timeout = timeout)
+
+  assign(".sparkRCon", con, envir = .sparkREnv)
+  con
+}
+
+launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts) {
+  if (.Platform$OS.type == "unix") {
+    sparkSubmitBinName = "spark-submit"
+  } else {
+    sparkSubmitBinName = "spark-submit.cmd"
+  }
+
+  if (sparkHome != "") {
+    sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
+  } else {
+    sparkSubmitBin <- sparkSubmitBinName
+  }
+
+  if (jars != "") {
+    jars <- paste("--jars", jars)
+  }
+
+  combinedArgs <- paste(jars, sparkSubmitOpts, args, sep = " ")
+  cat("Launching java with spark-submit command", sparkSubmitBin, combinedArgs, "\n")
+  invisible(system2(sparkSubmitBin, combinedArgs, wait = F))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/column.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R
new file mode 100644
index 0000000..e196305
--- /dev/null
+++ b/R/pkg/R/column.R
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Column Class
+
+#' @include generics.R jobj.R
+NULL
+
+setOldClass("jobj")
+
+#' @title S4 class that represents a DataFrame column
+#' @description The column class supports unary, binary operations on DataFrame columns
+
+#' @rdname column
+#'
+#' @param jc reference to JVM DataFrame column
+#' @export
+setClass("Column",
+         slots = list(jc = "jobj"))
+
+setMethod("initialize", "Column", function(.Object, jc) {
+  .Object@jc <- jc
+  .Object
+})
+
+column <- function(jc) {
+  new("Column", jc)
+}
+
+col <- function(x) {
+  column(callJStatic("org.apache.spark.sql.functions", "col", x))
+}
+
+#' @rdname show
+setMethod("show", "Column",
+          function(object) {
+            cat("Column", callJMethod(object@jc, "toString"), "\n")
+          })
+
+operators <- list(
+  "+" = "plus", "-" = "minus", "*" = "multiply", "/" = "divide", "%%" = "mod",
+  "==" = "equalTo", ">" = "gt", "<" = "lt", "!=" = "notEqual", "<=" = "leq", ">=" = "geq",
+  # we can not override `&&` and `||`, so use `&` and `|` instead
+  "&" = "and", "|" = "or" #, "!" = "unary_$bang"
+)
+column_functions1 <- c("asc", "desc", "isNull", "isNotNull")
+column_functions2 <- c("like", "rlike", "startsWith", "endsWith", "getField", "getItem", "contains")
+functions <- c("min", "max", "sum", "avg", "mean", "count", "abs", "sqrt",
+               "first", "last", "lower", "upper", "sumDistinct")
+
+createOperator <- function(op) {
+  setMethod(op,
+            signature(e1 = "Column"),
+            function(e1, e2) {
+              jc <- if (missing(e2)) {
+                if (op == "-") {
+                  callJMethod(e1@jc, "unary_$minus")
+                } else {
+                  callJMethod(e1@jc, operators[[op]])
+                }
+              } else {
+                if (class(e2) == "Column") {
+                  e2 <- e2@jc
+                }
+                callJMethod(e1@jc, operators[[op]], e2)
+              }
+              column(jc)
+            })
+}
+
+createColumnFunction1 <- function(name) {
+  setMethod(name,
+            signature(x = "Column"),
+            function(x) {
+              column(callJMethod(x@jc, name))
+            })
+}
+
+createColumnFunction2 <- function(name) {
+  setMethod(name,
+            signature(x = "Column"),
+            function(x, data) {
+              if (class(data) == "Column") {
+                data <- data@jc
+              }
+              jc <- callJMethod(x@jc, name, data)
+              column(jc)
+            })
+}
+
+createStaticFunction <- function(name) {
+  setMethod(name,
+            signature(x = "Column"),
+            function(x) {
+              jc <- callJStatic("org.apache.spark.sql.functions", name, x@jc)
+              column(jc)
+            })
+}
+
+createMethods <- function() {
+  for (op in names(operators)) {
+    createOperator(op)
+  }
+  for (name in column_functions1) {
+    createColumnFunction1(name)
+  }
+  for (name in column_functions2) {
+    createColumnFunction2(name)
+  }
+  for (x in functions) {
+    createStaticFunction(x)
+  }
+}
+
+createMethods()
+
+#' alias
+#'
+#' Set a new name for a column
+setMethod("alias",
+          signature(object = "Column"),
+          function(object, data) {
+            if (is.character(data)) {
+              column(callJMethod(object@jc, "as", data))
+            } else {
+              stop("data should be character")
+            }
+          })
+
+#' An expression that returns a substring.
+#'
+#' @param start starting position
+#' @param stop ending position
+setMethod("substr", signature(x = "Column"),
+          function(x, start, stop) {
+            jc <- callJMethod(x@jc, "substr", as.integer(start - 1), as.integer(stop - start + 1))
+            column(jc)
+          })
+
+#' Casts the column to a different data type.
+#' @examples
+#' \dontrun{
+#'   cast(df$age, "string")
+#'   cast(df$name, list(type="array", elementType="byte", containsNull = TRUE))
+#' }
+setMethod("cast",
+          signature(x = "Column"),
+          function(x, dataType) {
+            if (is.character(dataType)) {
+              column(callJMethod(x@jc, "cast", dataType))
+            } else if (is.list(dataType)) {
+              json <- tojson(dataType)
+              jdataType <- callJStatic("org.apache.spark.sql.types.DataType", "fromJson", json)
+              column(callJMethod(x@jc, "cast", jdataType))
+            } else {
+              stop("dataType should be character or list")
+            }
+          })
+
+#' Approx Count Distinct
+#'
+#' Returns the approximate number of distinct items in a group.
+#'
+setMethod("approxCountDistinct",
+          signature(x = "Column"),
+          function(x, rsd = 0.95) {
+            jc <- callJStatic("org.apache.spark.sql.functions", "approxCountDistinct", x@jc, rsd)
+            column(jc)
+          })
+
+#' Count Distinct
+#'
+#' returns the number of distinct items in a group.
+#'
+setMethod("countDistinct",
+          signature(x = "Column"),
+          function(x, ...) {
+            jcol <- lapply(list(...), function (x) {
+              x@jc
+            })
+            jc <- callJStatic("org.apache.spark.sql.functions", "countDistinct", x@jc,
+                              listToSeq(jcol))
+            column(jc)
+          })
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org