You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/04/09 07:46:01 UTC
[5/7] spark git commit: [SPARK-5654] Integrate SparkR
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
new file mode 100644
index 0000000..604ad03
--- /dev/null
+++ b/R/pkg/R/RDD.R
@@ -0,0 +1,1539 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# RDD in R implemented in S4 OO system.
+
+setOldClass("jobj")
+
+#' @title S4 class that represents an RDD
+#' @description RDD can be created using functions like
+#' \code{parallelize}, \code{textFile} etc.
+#' @rdname RDD
+#' @seealso parallelize, textFile
+#'
+#' @slot env An R environment that stores bookkeeping states of the RDD
+#' @slot jrdd Java object reference to the backing JavaRDD
+#' to an RDD
+#' @export
+setClass("RDD",
+ slots = list(env = "environment",
+ jrdd = "jobj"))
+
+setClass("PipelinedRDD",
+ slots = list(prev = "RDD",
+ func = "function",
+ prev_jrdd = "jobj"),
+ contains = "RDD")
+
+setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
+ isCached, isCheckpointed) {
+ # Check that RDD constructor is using the correct version of serializedMode
+ stopifnot(class(serializedMode) == "character")
+ stopifnot(serializedMode %in% c("byte", "string", "row"))
+ # RDD has three serialization types:
+ # byte: The RDD stores data serialized in R.
+ # string: The RDD stores data as strings.
+ # row: The RDD stores the serialized rows of a DataFrame.
+
+ # We use an environment to store mutable states inside an RDD object.
+ # Note that R's call-by-value semantics makes modifying slots inside an
+ # object (passed as an argument into a function, such as cache()) difficult:
+ # i.e. one needs to make a copy of the RDD object and sets the new slot value
+ # there.
+
+ # The slots are inheritable from superclass. Here, both `env' and `jrdd' are
+ # inherited from RDD, but only the former is used.
+ .Object@env <- new.env()
+ .Object@env$isCached <- isCached
+ .Object@env$isCheckpointed <- isCheckpointed
+ .Object@env$serializedMode <- serializedMode
+
+ .Object@jrdd <- jrdd
+ .Object
+})
+
+setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
+ .Object@env <- new.env()
+ .Object@env$isCached <- FALSE
+ .Object@env$isCheckpointed <- FALSE
+ .Object@env$jrdd_val <- jrdd_val
+ if (!is.null(jrdd_val)) {
+ # This tracks the serialization mode for jrdd_val
+ .Object@env$serializedMode <- prev@env$serializedMode
+ }
+
+ .Object@prev <- prev
+
+ isPipelinable <- function(rdd) {
+ e <- rdd@env
+ !(e$isCached || e$isCheckpointed)
+ }
+
+ if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
+ # This transformation is the first in its stage:
+ .Object@func <- func
+ .Object@prev_jrdd <- getJRDD(prev)
+ .Object@env$prev_serializedMode <- prev@env$serializedMode
+ # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
+ # prev_serializedMode is used during the delayed computation of JRDD in getJRDD
+ } else {
+ pipelinedFunc <- function(split, iterator) {
+ func(split, prev@func(split, iterator))
+ }
+ .Object@func <- pipelinedFunc
+ .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
+ # Get the serialization mode of the parent RDD
+ .Object@env$prev_serializedMode <- prev@env$prev_serializedMode
+ }
+
+ .Object
+})
+
+#' @rdname RDD
+#' @export
+#'
+#' @param jrdd Java object reference to the backing JavaRDD
+#' @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
+#' stores strings, and "row" if the RDD stores the rows of a DataFrame
+#' @param isCached TRUE if the RDD is cached
+#' @param isCheckpointed TRUE if the RDD has been checkpointed
+RDD <- function(jrdd, serializedMode = "byte", isCached = FALSE,
+ isCheckpointed = FALSE) {
+ new("RDD", jrdd, serializedMode, isCached, isCheckpointed)
+}
+
+PipelinedRDD <- function(prev, func) {
+ new("PipelinedRDD", prev, func, NULL)
+}
+
+# Return the serialization mode for an RDD.
+setGeneric("getSerializedMode", function(rdd, ...) { standardGeneric("getSerializedMode") })
+# For normal RDDs we can directly read the serializedMode
+setMethod("getSerializedMode", signature(rdd = "RDD"), function(rdd) rdd@env$serializedMode )
+# For pipelined RDDs if jrdd_val is set then serializedMode should exist
+# if not we return the defaultSerialization mode of "byte" as we don't know the serialization
+# mode at this point in time.
+setMethod("getSerializedMode", signature(rdd = "PipelinedRDD"),
+ function(rdd) {
+ if (!is.null(rdd@env$jrdd_val)) {
+ return(rdd@env$serializedMode)
+ } else {
+ return("byte")
+ }
+ })
+
+# The jrdd accessor function.
+setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd )
+setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
+ function(rdd, serializedMode = "byte") {
+ if (!is.null(rdd@env$jrdd_val)) {
+ return(rdd@env$jrdd_val)
+ }
+
+ computeFunc <- function(split, part) {
+ rdd@func(split, part)
+ }
+
+ packageNamesArr <- serialize(.sparkREnv[[".packages"]],
+ connection = NULL)
+
+ broadcastArr <- lapply(ls(.broadcastNames),
+ function(name) { get(name, .broadcastNames) })
+
+ serializedFuncArr <- serialize(computeFunc, connection = NULL)
+
+ prev_jrdd <- rdd@prev_jrdd
+
+ if (serializedMode == "string") {
+ rddRef <- newJObject("org.apache.spark.api.r.StringRRDD",
+ callJMethod(prev_jrdd, "rdd"),
+ serializedFuncArr,
+ rdd@env$prev_serializedMode,
+ packageNamesArr,
+ as.character(.sparkREnv[["libname"]]),
+ broadcastArr,
+ callJMethod(prev_jrdd, "classTag"))
+ } else {
+ rddRef <- newJObject("org.apache.spark.api.r.RRDD",
+ callJMethod(prev_jrdd, "rdd"),
+ serializedFuncArr,
+ rdd@env$prev_serializedMode,
+ serializedMode,
+ packageNamesArr,
+ as.character(.sparkREnv[["libname"]]),
+ broadcastArr,
+ callJMethod(prev_jrdd, "classTag"))
+ }
+ # Save the serialization flag after we create a RRDD
+ rdd@env$serializedMode <- serializedMode
+ rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD") # rddRef$asJavaRDD()
+ rdd@env$jrdd_val
+ })
+
+setValidity("RDD",
+ function(object) {
+ jrdd <- getJRDD(object)
+ cls <- callJMethod(jrdd, "getClass")
+ className <- callJMethod(cls, "getName")
+ if (grep("spark.api.java.*RDD*", className) == 1) {
+ TRUE
+ } else {
+ paste("Invalid RDD class ", className)
+ }
+ })
+
+
+############ Actions and Transformations ############
+
+#' Persist an RDD
+#'
+#' Persist this RDD with the default storage level (MEMORY_ONLY).
+#'
+#' @param x The RDD to cache
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' cache(rdd)
+#'}
+#' @rdname cache-methods
+#' @aliases cache,RDD-method
+setMethod("cache",
+ signature(x = "RDD"),
+ function(x) {
+ callJMethod(getJRDD(x), "cache")
+ x@env$isCached <- TRUE
+ x
+ })
+
+#' Persist an RDD
+#'
+#' Persist this RDD with the specified storage level. For details of the
+#' supported storage levels, refer to
+#' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
+#'
+#' @param x The RDD to persist
+#' @param newLevel The new storage level to be assigned
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' persist(rdd, "MEMORY_AND_DISK")
+#'}
+#' @rdname persist
+#' @aliases persist,RDD-method
+setMethod("persist",
+ signature(x = "RDD", newLevel = "character"),
+ function(x, newLevel) {
+ callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel))
+ x@env$isCached <- TRUE
+ x
+ })
+
+#' Unpersist an RDD
+#'
+#' Mark the RDD as non-persistent, and remove all blocks for it from memory and
+#' disk.
+#'
+#' @param x The RDD to unpersist
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' cache(rdd) # rdd@@env$isCached == TRUE
+#' unpersist(rdd) # rdd@@env$isCached == FALSE
+#'}
+#' @rdname unpersist-methods
+#' @aliases unpersist,RDD-method
+setMethod("unpersist",
+ signature(x = "RDD"),
+ function(x) {
+ callJMethod(getJRDD(x), "unpersist")
+ x@env$isCached <- FALSE
+ x
+ })
+
+#' Checkpoint an RDD
+#'
+#' Mark this RDD for checkpointing. It will be saved to a file inside the
+#' checkpoint directory set with setCheckpointDir() and all references to its
+#' parent RDDs will be removed. This function must be called before any job has
+#' been executed on this RDD. It is strongly recommended that this RDD is
+#' persisted in memory, otherwise saving it on a file will require recomputation.
+#'
+#' @param x The RDD to checkpoint
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' setCheckpointDir(sc, "checkpoints")
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' checkpoint(rdd)
+#'}
+#' @rdname checkpoint-methods
+#' @aliases checkpoint,RDD-method
+setMethod("checkpoint",
+ signature(x = "RDD"),
+ function(x) {
+ jrdd <- getJRDD(x)
+ callJMethod(jrdd, "checkpoint")
+ x@env$isCheckpointed <- TRUE
+ x
+ })
+
+#' Gets the number of partitions of an RDD
+#'
+#' @param x A RDD.
+#' @return the number of partitions of rdd as an integer.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' numPartitions(rdd) # 2L
+#'}
+#' @rdname numPartitions
+#' @aliases numPartitions,RDD-method
+setMethod("numPartitions",
+ signature(x = "RDD"),
+ function(x) {
+ jrdd <- getJRDD(x)
+ partitions <- callJMethod(jrdd, "splits")
+ callJMethod(partitions, "size")
+ })
+
+#' Collect elements of an RDD
+#'
+#' @description
+#' \code{collect} returns a list that contains all of the elements in this RDD.
+#'
+#' @param x The RDD to collect
+#' @param ... Other optional arguments to collect
+#' @param flatten FALSE if the list should not flattened
+#' @return a list containing elements in the RDD
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2L)
+#' collect(rdd) # list from 1 to 10
+#' collectPartition(rdd, 0L) # list from 1 to 5
+#'}
+#' @rdname collect-methods
+#' @aliases collect,RDD-method
+setMethod("collect",
+ signature(x = "RDD"),
+ function(x, flatten = TRUE) {
+ # Assumes a pairwise RDD is backed by a JavaPairRDD.
+ collected <- callJMethod(getJRDD(x), "collect")
+ convertJListToRList(collected, flatten,
+ serializedMode = getSerializedMode(x))
+ })
+
+
+#' @description
+#' \code{collectPartition} returns a list that contains all of the elements
+#' in the specified partition of the RDD.
+#' @param partitionId the partition to collect (starts from 0)
+#' @rdname collect-methods
+#' @aliases collectPartition,integer,RDD-method
+setMethod("collectPartition",
+ signature(x = "RDD", partitionId = "integer"),
+ function(x, partitionId) {
+ jPartitionsList <- callJMethod(getJRDD(x),
+ "collectPartitions",
+ as.list(as.integer(partitionId)))
+
+ jList <- jPartitionsList[[1]]
+ convertJListToRList(jList, flatten = TRUE,
+ serializedMode = getSerializedMode(x))
+ })
+
+#' @description
+#' \code{collectAsMap} returns a named list as a map that contains all of the elements
+#' in a key-value pair RDD.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
+#' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
+#'}
+#' @rdname collect-methods
+#' @aliases collectAsMap,RDD-method
+setMethod("collectAsMap",
+ signature(x = "RDD"),
+ function(x) {
+ pairList <- collect(x)
+ map <- new.env()
+ lapply(pairList, function(i) { assign(as.character(i[[1]]), i[[2]], envir = map) })
+ as.list(map)
+ })
+
+#' Return the number of elements in the RDD.
+#'
+#' @param x The RDD to count
+#' @return number of elements in the RDD.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' count(rdd) # 10
+#' length(rdd) # Same as count
+#'}
+#' @rdname count
+#' @aliases count,RDD-method
+setMethod("count",
+ signature(x = "RDD"),
+ function(x) {
+ countPartition <- function(part) {
+ as.integer(length(part))
+ }
+ valsRDD <- lapplyPartition(x, countPartition)
+ vals <- collect(valsRDD)
+ sum(as.integer(vals))
+ })
+
+#' Return the number of elements in the RDD
+#' @export
+#' @rdname count
+setMethod("length",
+ signature(x = "RDD"),
+ function(x) {
+ count(x)
+ })
+
+#' Return the count of each unique value in this RDD as a list of
+#' (value, count) pairs.
+#'
+#' Same as countByValue in Spark.
+#'
+#' @param x The RDD to count
+#' @return list of (value, count) pairs, where count is number of each unique
+#' value in rdd.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, c(1,2,3,2,1))
+#' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
+#'}
+#' @rdname countByValue
+#' @aliases countByValue,RDD-method
+setMethod("countByValue",
+ signature(x = "RDD"),
+ function(x) {
+ ones <- lapply(x, function(item) { list(item, 1L) })
+ collect(reduceByKey(ones, `+`, numPartitions(x)))
+ })
+
+#' Apply a function to all elements
+#'
+#' This function creates a new RDD by applying the given transformation to all
+#' elements of the given RDD
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each element
+#' @return a new RDD created by the transformation.
+#' @rdname lapply
+#' @aliases lapply
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
+#' collect(multiplyByTwo) # 2,4,6...
+#'}
+setMethod("lapply",
+ signature(X = "RDD", FUN = "function"),
+ function(X, FUN) {
+ func <- function(split, iterator) {
+ lapply(iterator, FUN)
+ }
+ lapplyPartitionsWithIndex(X, func)
+ })
+
+#' @rdname lapply
+#' @aliases map,RDD,function-method
+setMethod("map",
+ signature(X = "RDD", FUN = "function"),
+ function(X, FUN) {
+ lapply(X, FUN)
+ })
+
+#' Flatten results after apply a function to all elements
+#'
+#' This function return a new RDD by first applying a function to all
+#' elements of this RDD, and then flattening the results.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each element
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
+#' collect(multiplyByTwo) # 2,20,4,40,6,60...
+#'}
+#' @rdname flatMap
+#' @aliases flatMap,RDD,function-method
+setMethod("flatMap",
+ signature(X = "RDD", FUN = "function"),
+ function(X, FUN) {
+ partitionFunc <- function(part) {
+ unlist(
+ lapply(part, FUN),
+ recursive = F
+ )
+ }
+ lapplyPartition(X, partitionFunc)
+ })
+
+#' Apply a function to each partition of an RDD
+#'
+#' Return a new RDD by applying a function to each partition of this RDD.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each partition.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
+#' collect(partitionSum) # 15, 40
+#'}
+#' @rdname lapplyPartition
+#' @aliases lapplyPartition,RDD,function-method
+setMethod("lapplyPartition",
+ signature(X = "RDD", FUN = "function"),
+ function(X, FUN) {
+ lapplyPartitionsWithIndex(X, function(s, part) { FUN(part) })
+ })
+
+#' mapPartitions is the same as lapplyPartition.
+#'
+#' @rdname lapplyPartition
+#' @aliases mapPartitions,RDD,function-method
+setMethod("mapPartitions",
+ signature(X = "RDD", FUN = "function"),
+ function(X, FUN) {
+ lapplyPartition(X, FUN)
+ })
+
+#' Return a new RDD by applying a function to each partition of this RDD, while
+#' tracking the index of the original partition.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on each partition; takes the partition
+#' index and a list of elements in the particular partition.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 5L)
+#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) {
+#' split * Reduce("+", part) })
+#' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76
+#'}
+#' @rdname lapplyPartitionsWithIndex
+#' @aliases lapplyPartitionsWithIndex,RDD,function-method
+setMethod("lapplyPartitionsWithIndex",
+ signature(X = "RDD", FUN = "function"),
+ function(X, FUN) {
+ FUN <- cleanClosure(FUN)
+ closureCapturingFunc <- function(split, part) {
+ FUN(split, part)
+ }
+ PipelinedRDD(X, closureCapturingFunc)
+ })
+
+#' @rdname lapplyPartitionsWithIndex
+#' @aliases mapPartitionsWithIndex,RDD,function-method
+setMethod("mapPartitionsWithIndex",
+ signature(X = "RDD", FUN = "function"),
+ function(X, FUN) {
+ lapplyPartitionsWithIndex(X, FUN)
+ })
+
+#' This function returns a new RDD containing only the elements that satisfy
+#' a predicate (i.e. returning TRUE in a given logical function).
+#' The same as `filter()' in Spark.
+#'
+#' @param x The RDD to be filtered.
+#' @param f A unary predicate function.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' unlist(collect(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
+#'}
+#' @rdname filterRDD
+#' @aliases filterRDD,RDD,function-method
+setMethod("filterRDD",
+ signature(x = "RDD", f = "function"),
+ function(x, f) {
+ filter.func <- function(part) {
+ Filter(f, part)
+ }
+ lapplyPartition(x, filter.func)
+ })
+
+#' @rdname filterRDD
+#' @aliases Filter
+setMethod("Filter",
+ signature(f = "function", x = "RDD"),
+ function(f, x) {
+ filterRDD(x, f)
+ })
+
+#' Reduce across elements of an RDD.
+#'
+#' This function reduces the elements of this RDD using the
+#' specified commutative and associative binary operator.
+#'
+#' @param x The RDD to reduce
+#' @param func Commutative and associative function to apply on elements
+#' of the RDD.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' reduce(rdd, "+") # 55
+#'}
+#' @rdname reduce
+#' @aliases reduce,RDD,ANY-method
+setMethod("reduce",
+ signature(x = "RDD", func = "ANY"),
+ function(x, func) {
+
+ reducePartition <- function(part) {
+ Reduce(func, part)
+ }
+
+ partitionList <- collect(lapplyPartition(x, reducePartition),
+ flatten = FALSE)
+ Reduce(func, partitionList)
+ })
+
+#' Get the maximum element of an RDD.
+#'
+#' @param x The RDD to get the maximum element from
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' maximum(rdd) # 10
+#'}
+#' @rdname maximum
+#' @aliases maximum,RDD
+setMethod("maximum",
+ signature(x = "RDD"),
+ function(x) {
+ reduce(x, max)
+ })
+
+#' Get the minimum element of an RDD.
+#'
+#' @param x The RDD to get the minimum element from
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' minimum(rdd) # 1
+#'}
+#' @rdname minimum
+#' @aliases minimum,RDD
+setMethod("minimum",
+ signature(x = "RDD"),
+ function(x) {
+ reduce(x, min)
+ })
+
+#' Add up the elements in an RDD.
+#'
+#' @param x The RDD to add up the elements in
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' sumRDD(rdd) # 55
+#'}
+#' @rdname sumRDD
+#' @aliases sumRDD,RDD
+setMethod("sumRDD",
+ signature(x = "RDD"),
+ function(x) {
+ reduce(x, "+")
+ })
+
+#' Applies a function to all elements in an RDD, and force evaluation.
+#'
+#' @param x The RDD to apply the function
+#' @param func The function to be applied.
+#' @return invisible NULL.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' foreach(rdd, function(x) { save(x, file=...) })
+#'}
+#' @rdname foreach
+#' @aliases foreach,RDD,function-method
+setMethod("foreach",
+ signature(x = "RDD", func = "function"),
+ function(x, func) {
+ partition.func <- function(x) {
+ lapply(x, func)
+ NULL
+ }
+ invisible(collect(mapPartitions(x, partition.func)))
+ })
+
+#' Applies a function to each partition in an RDD, and force evaluation.
+#'
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' foreachPartition(rdd, function(part) { save(part, file=...); NULL })
+#'}
+#' @rdname foreach
+#' @aliases foreachPartition,RDD,function-method
+setMethod("foreachPartition",
+ signature(x = "RDD", func = "function"),
+ function(x, func) {
+ invisible(collect(mapPartitions(x, func)))
+ })
+
+#' Take elements from an RDD.
+#'
+#' This function takes the first NUM elements in the RDD and
+#' returns them in a list.
+#'
+#' @param x The RDD to take elements from
+#' @param num Number of elements to take
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' take(rdd, 2L) # list(1, 2)
+#'}
+#' @rdname take
+#' @aliases take,RDD,numeric-method
+setMethod("take",
+ signature(x = "RDD", num = "numeric"),
+ function(x, num) {
+ resList <- list()
+ index <- -1
+ jrdd <- getJRDD(x)
+ numPartitions <- numPartitions(x)
+
+ # TODO(shivaram): Collect more than one partition based on size
+ # estimates similar to the scala version of `take`.
+ while (TRUE) {
+ index <- index + 1
+
+ if (length(resList) >= num || index >= numPartitions)
+ break
+
+ # a JList of byte arrays
+ partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
+ partition <- partitionArr[[1]]
+
+ size <- num - length(resList)
+ # elems is capped to have at most `size` elements
+ elems <- convertJListToRList(partition,
+ flatten = TRUE,
+ logicalUpperBound = size,
+ serializedMode = getSerializedMode(x))
+ # TODO: Check if this append is O(n^2)?
+ resList <- append(resList, elems)
+ }
+ resList
+ })
+
+#' First
+#'
+#' Return the first element of an RDD
+#'
+#' @rdname first
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' first(rdd)
+#' }
+setMethod("first",
+ signature(x = "RDD"),
+ function(x) {
+ take(x, 1)[[1]]
+ })
+
+#' Removes the duplicates from RDD.
+#'
+#' This function returns a new RDD containing the distinct elements in the
+#' given RDD. The same as `distinct()' in Spark.
+#'
+#' @param x The RDD to remove duplicates from.
+#' @param numPartitions Number of partitions to create.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, c(1,2,2,3,3,3))
+#' sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
+#'}
+#' @rdname distinct
+#' @aliases distinct,RDD-method
+setMethod("distinct",
+ signature(x = "RDD"),
+ function(x, numPartitions = SparkR::numPartitions(x)) {
+ identical.mapped <- lapply(x, function(x) { list(x, NULL) })
+ reduced <- reduceByKey(identical.mapped,
+ function(x, y) { x },
+ numPartitions)
+ resRDD <- lapply(reduced, function(x) { x[[1]] })
+ resRDD
+ })
+
+#' Return an RDD that is a sampled subset of the given RDD.
+#'
+#' The same as `sample()' in Spark. (We rename it due to signature
+#' inconsistencies with the `sample()' function in R's base package.)
+#'
+#' @param x The RDD to sample elements from
+#' @param withReplacement Sampling with replacement or not
+#' @param fraction The (rough) sample target fraction
+#' @param seed Randomness seed value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split
+#' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
+#' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
+#'}
+#' @rdname sampleRDD
+#' @aliases sampleRDD,RDD
+setMethod("sampleRDD",
+ signature(x = "RDD", withReplacement = "logical",
+ fraction = "numeric", seed = "integer"),
+ function(x, withReplacement, fraction, seed) {
+
+ # The sampler: takes a partition and returns its sampled version.
+ samplingFunc <- function(split, part) {
+ set.seed(seed)
+ res <- vector("list", length(part))
+ len <- 0
+
+ # Discards some random values to ensure each partition has a
+ # different random seed.
+ runif(split)
+
+ for (elem in part) {
+ if (withReplacement) {
+ count <- rpois(1, fraction)
+ if (count > 0) {
+ res[(len + 1):(len + count)] <- rep(list(elem), count)
+ len <- len + count
+ }
+ } else {
+ if (runif(1) < fraction) {
+ len <- len + 1
+ res[[len]] <- elem
+ }
+ }
+ }
+
+ # TODO(zongheng): look into the performance of the current
+ # implementation. Look into some iterator package? Note that
+ # Scala avoids many calls to creating an empty list and PySpark
+ # similarly achieves this using `yield'.
+ if (len > 0)
+ res[1:len]
+ else
+ list()
+ }
+
+ lapplyPartitionsWithIndex(x, samplingFunc)
+ })
+
+#' Return a list of the elements that are a sampled subset of the given RDD.
+#'
+#' @param x The RDD to sample elements from
+#' @param withReplacement Sampling with replacement or not
+#' @param num Number of elements to return
+#' @param seed Randomness seed value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:100)
+#' # exactly 5 elements sampled, which may not be distinct
+#' takeSample(rdd, TRUE, 5L, 1618L)
+#' # exactly 5 distinct elements sampled
+#' takeSample(rdd, FALSE, 5L, 16181618L)
+#'}
+#' @rdname takeSample
+#' @aliases takeSample,RDD
+setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
+ num = "integer", seed = "integer"),
+ function(x, withReplacement, num, seed) {
+ # This function is ported from RDD.scala.
+ fraction <- 0.0
+ total <- 0
+ multiplier <- 3.0
+ initialCount <- count(x)
+ maxSelected <- 0
+ MAXINT <- .Machine$integer.max
+
+ if (num < 0)
+ stop(paste("Negative number of elements requested"))
+
+ if (initialCount > MAXINT - 1) {
+ maxSelected <- MAXINT - 1
+ } else {
+ maxSelected <- initialCount
+ }
+
+ if (num > initialCount && !withReplacement) {
+ total <- maxSelected
+ fraction <- multiplier * (maxSelected + 1) / initialCount
+ } else {
+ total <- num
+ fraction <- multiplier * (num + 1) / initialCount
+ }
+
+ set.seed(seed)
+ samples <- collect(sampleRDD(x, withReplacement, fraction,
+ as.integer(ceiling(runif(1,
+ -MAXINT,
+ MAXINT)))))
+ # If the first sample didn't turn out large enough, keep trying to
+ # take samples; this shouldn't happen often because we use a big
+ # multiplier for thei initial size
+ while (length(samples) < total)
+ samples <- collect(sampleRDD(x, withReplacement, fraction,
+ as.integer(ceiling(runif(1,
+ -MAXINT,
+ MAXINT)))))
+
+ # TODO(zongheng): investigate if this call is an in-place shuffle?
+ sample(samples)[1:total]
+ })
+
+#' Creates tuples of the elements in this RDD by applying a function.
+#'
+#' @param x The RDD.
+#' @param func The function to be applied.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3))
+#' collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
+#'}
+#' @rdname keyBy
+#' @aliases keyBy,RDD
+setMethod("keyBy",
+ signature(x = "RDD", func = "function"),
+ function(x, func) {
+ apply.func <- function(x) {
+ list(func(x), x)
+ }
+ lapply(x, apply.func)
+ })
+
+#' Return a new RDD that has exactly numPartitions partitions.
+#' Can increase or decrease the level of parallelism in this RDD. Internally,
+#' this uses a shuffle to redistribute data.
+#' If you are decreasing the number of partitions in this RDD, consider using
+#' coalesce, which can avoid performing a shuffle.
+#'
+#' @param x The RDD.
+#' @param numPartitions Number of partitions to create.
+#' @seealso coalesce
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
+#' numPartitions(rdd) # 4
+#' numPartitions(repartition(rdd, 2L)) # 2
+#'}
+#' @rdname repartition
+#' @aliases repartition,RDD
+setMethod("repartition",
+ signature(x = "RDD", numPartitions = "numeric"),
+ function(x, numPartitions) {
+ coalesce(x, numToInt(numPartitions), TRUE)
+ })
+
+#' Return a new RDD that is reduced into numPartitions partitions.
+#'
+#' @param x The RDD.
+#' @param numPartitions Number of partitions to create.
+#' @seealso repartition
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
+#' numPartitions(rdd) # 3
+#' numPartitions(coalesce(rdd, 1L)) # 1
+#'}
+#' @rdname coalesce
+#' @aliases coalesce,RDD
+setMethod("coalesce",
+ signature(x = "RDD", numPartitions = "numeric"),
+ function(x, numPartitions, shuffle = FALSE) {
+ numPartitions <- numToInt(numPartitions)
+ if (shuffle || numPartitions > SparkR::numPartitions(x)) {
+ func <- function(s, part) {
+ set.seed(s) # split as seed
+ start <- as.integer(sample(numPartitions, 1) - 1)
+ lapply(seq_along(part),
+ function(i) {
+ pos <- (start + i) %% numPartitions
+ list(pos, part[[i]])
+ })
+ }
+ shuffled <- lapplyPartitionsWithIndex(x, func)
+ repartitioned <- partitionBy(shuffled, numPartitions)
+ values(repartitioned)
+ } else {
+ jrdd <- callJMethod(getJRDD(x), "coalesce", numPartitions, shuffle)
+ RDD(jrdd)
+ }
+ })
+
+#' Save this RDD as a SequenceFile of serialized objects.
+#'
+#' @param x The RDD to save
+#' @param path The directory where the file is saved
+#' @seealso objectFile
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3)
+#' saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
+#'}
+#' @rdname saveAsObjectFile
+#' @aliases saveAsObjectFile,RDD
+setMethod("saveAsObjectFile",
+ signature(x = "RDD", path = "character"),
+ function(x, path) {
+ # If serializedMode == "string" we need to serialize the data before saving it since
+ # objectFile() assumes serializedMode == "byte".
+ if (getSerializedMode(x) != "byte") {
+ x <- serializeToBytes(x)
+ }
+ # Return nothing
+ invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path))
+ })
+
+#' Save this RDD as a text file, using string representations of elements.
+#'
+#' @param x The RDD to save
+#' @param path The directory where the splits of the text file are saved
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3)
+#' saveAsTextFile(rdd, "/tmp/sparkR-tmp")
+#'}
+#' @rdname saveAsTextFile
+#' @aliases saveAsTextFile,RDD
+setMethod("saveAsTextFile",
+ signature(x = "RDD", path = "character"),
+ function(x, path) {
+ func <- function(str) {
+ toString(str)
+ }
+ stringRdd <- lapply(x, func)
+ # Return nothing
+ invisible(
+ callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path))
+ })
+
+#' Sort an RDD by the given key function.
+#'
+#' @param x An RDD to be sorted.
+#' @param func A function used to compute the sort key for each element.
+#' @param ascending A flag to indicate whether the sorting is ascending or descending.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where all elements are sorted.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(3, 2, 1))
+#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
+#'}
+#' @rdname sortBy
+#' @aliases sortBy,RDD,RDD-method
+setMethod("sortBy",
+ signature(x = "RDD", func = "function"),
+ function(x, func, ascending = TRUE, numPartitions = SparkR::numPartitions(x)) {
+ values(sortByKey(keyBy(x, func), ascending, numPartitions))
+ })
+
+# Helper function to get first N elements from an RDD in the specified order.
+# Param:
+# x An RDD.
+# num Number of elements to return.
+# ascending A flag to indicate whether the sorting is ascending or descending.
+# Return:
+# A list of the first N elements from the RDD in the specified order.
+#
+takeOrderedElem <- function(x, num, ascending = TRUE) {
+ if (num <= 0L) {
+ return(list())
+ }
+
+ partitionFunc <- function(part) {
+ if (num < length(part)) {
+ # R limitation: order works only on primitive types!
+ ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
+ list(part[ord[1:num]])
+ } else {
+ list(part)
+ }
+ }
+
+ reduceFunc <- function(elems, part) {
+ newElems <- append(elems, part)
+ # R limitation: order works only on primitive types!
+ ord <- order(unlist(newElems, recursive = FALSE), decreasing = !ascending)
+ newElems[ord[1:num]]
+ }
+
+ newRdd <- mapPartitions(x, partitionFunc)
+ reduce(newRdd, reduceFunc)
+}
+
+#' Returns the first N elements from an RDD in ascending order.
+#'
+#' @param x An RDD.
+#' @param num Number of elements to return.
+#' @return The first N elements from the RDD in ascending order.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
+#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
+#'}
+#' @rdname takeOrdered
+#' @aliases takeOrdered,RDD,RDD-method
+setMethod("takeOrdered",
+ signature(x = "RDD", num = "integer"),
+ function(x, num) {
+ takeOrderedElem(x, num)
+ })
+
+#' Returns the top N elements from an RDD.
+#'
+#' @param x An RDD.
+#' @param num Number of elements to return.
+#' @return The top N elements from the RDD.
+#' @rdname top
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
+#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
+#'}
+#' @rdname top
+#' @aliases top,RDD,RDD-method
+setMethod("top",
+ signature(x = "RDD", num = "integer"),
+ function(x, num) {
+ takeOrderedElem(x, num, FALSE)
+ })
+
+#' Fold an RDD using a given associative function and a neutral "zero value".
+#'
+#' Aggregate the elements of each partition, and then the results for all the
+#' partitions, using a given associative function and a neutral "zero value".
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param op An associative function for the folding operation.
+#' @return The folding result.
+#' @rdname fold
+#' @seealso reduce
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
+#' fold(rdd, 0, "+") # 15
+#'}
+#' @rdname fold
+#' @aliases fold,RDD,RDD-method
+setMethod("fold",
+ signature(x = "RDD", zeroValue = "ANY", op = "ANY"),
+ function(x, zeroValue, op) {
+ aggregateRDD(x, zeroValue, op, op)
+ })
+
+#' Aggregate an RDD using the given combine functions and a neutral "zero value".
+#'
+#' Aggregate the elements of each partition, and then the results for all the
+#' partitions, using given combine functions and a neutral "zero value".
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param seqOp A function to aggregate the RDD elements. It may return a different
+#' result type from the type of the RDD elements.
+#' @param combOp A function to aggregate results of seqOp.
+#' @return The aggregation result.
+#' @rdname aggregateRDD
+#' @seealso reduce
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1, 2, 3, 4))
+#' zeroValue <- list(0, 0)
+#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
+#'}
+#' @rdname aggregateRDD
+#' @aliases aggregateRDD,RDD,RDD-method
+setMethod("aggregateRDD",
+ signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
+ function(x, zeroValue, seqOp, combOp) {
+ partitionFunc <- function(part) {
+ Reduce(seqOp, part, zeroValue)
+ }
+
+ partitionList <- collect(lapplyPartition(x, partitionFunc),
+ flatten = FALSE)
+ Reduce(combOp, partitionList, zeroValue)
+ })
+
+#' Pipes elements to a forked external process.
+#'
+#' The same as 'pipe()' in Spark.
+#'
+#' @param x The RDD whose elements are piped to the forked external process.
+#' @param command The command to fork an external process.
+#' @param env A named list to set environment variables of the external process.
+#' @return A new RDD created by piping all elements to a forked external process.
+#' @rdname pipeRDD
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' collect(pipeRDD(rdd, "more")
+#' Output: c("1", "2", ..., "10")
+#'}
+#' @rdname pipeRDD
+#' @aliases pipeRDD,RDD,character-method
+setMethod("pipeRDD",
+ signature(x = "RDD", command = "character"),
+ function(x, command, env = list()) {
+ func <- function(part) {
+ trim.trailing.func <- function(x) {
+ sub("[\r\n]*$", "", toString(x))
+ }
+ input <- unlist(lapply(part, trim.trailing.func))
+ res <- system2(command, stdout = TRUE, input = input, env = env)
+ lapply(res, trim.trailing.func)
+ }
+ lapplyPartition(x, func)
+ })
+
+# TODO: Consider caching the name in the RDD's environment
+#' Return an RDD's name.
+#'
+#' @param x The RDD whose name is returned.
+#' @rdname name
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1,2,3))
+#' name(rdd) # NULL (if not set before)
+#'}
+#' @rdname name
+#' @aliases name,RDD
+setMethod("name",
+ signature(x = "RDD"),
+ function(x) {
+ callJMethod(getJRDD(x), "name")
+ })
+
+#' Set an RDD's name.
+#'
+#' @param x The RDD whose name is to be set.
+#' @param name The RDD name to be set.
+#' @return a new RDD renamed.
+#' @rdname setName
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(1,2,3))
+#' setName(rdd, "myRDD")
+#' name(rdd) # "myRDD"
+#'}
+#' @rdname setName
+#' @aliases setName,RDD
+setMethod("setName",
+ signature(x = "RDD", name = "character"),
+ function(x, name) {
+ callJMethod(getJRDD(x), "setName", name)
+ x
+ })
+
+#' Zip an RDD with generated unique Long IDs.
+#'
+#' Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
+#' n is the number of partitions. So there may exist gaps, but this
+#' method won't trigger a spark job, which is different from
+#' zipWithIndex.
+#'
+#' @param x An RDD to be zipped.
+#' @return An RDD with zipped items.
+#' @seealso zipWithIndex
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+#' collect(zipWithUniqueId(rdd))
+#' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
+#'}
+#' @rdname zipWithUniqueId
+#' @aliases zipWithUniqueId,RDD
+setMethod("zipWithUniqueId",
+ signature(x = "RDD"),
+ function(x) {
+ n <- numPartitions(x)
+
+ partitionFunc <- function(split, part) {
+ mapply(
+ function(item, index) {
+ list(item, (index - 1) * n + split)
+ },
+ part,
+ seq_along(part),
+ SIMPLIFY = FALSE)
+ }
+
+ lapplyPartitionsWithIndex(x, partitionFunc)
+ })
+
+#' Zip an RDD with its element indices.
+#'
+#' The ordering is first based on the partition index and then the
+#' ordering of items within each partition. So the first item in
+#' the first partition gets index 0, and the last item in the last
+#' partition receives the largest index.
+#'
+#' This method needs to trigger a Spark job when this RDD contains
+#' more than one partition.
+#'
+#' @param x An RDD to be zipped.
+#' @return An RDD with zipped items.
+#' @seealso zipWithUniqueId
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+#' collect(zipWithIndex(rdd))
+#' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
+#'}
+#' @rdname zipWithIndex
+#' @aliases zipWithIndex,RDD
+setMethod("zipWithIndex",
+ signature(x = "RDD"),
+ function(x) {
+ n <- numPartitions(x)
+ if (n > 1) {
+ nums <- collect(lapplyPartition(x,
+ function(part) {
+ list(length(part))
+ }))
+ startIndices <- Reduce("+", nums, accumulate = TRUE)
+ }
+
+ partitionFunc <- function(split, part) {
+ if (split == 0) {
+ startIndex <- 0
+ } else {
+ startIndex <- startIndices[[split]]
+ }
+
+ mapply(
+ function(item, index) {
+ list(item, index - 1 + startIndex)
+ },
+ part,
+ seq_along(part),
+ SIMPLIFY = FALSE)
+ }
+
+ lapplyPartitionsWithIndex(x, partitionFunc)
+ })
+
+#' Coalesce all elements within each partition of an RDD into a list.
+#'
+#' @param x An RDD.
+#' @return An RDD created by coalescing all elements within
+#' each partition into a list.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, as.list(1:4), 2L)
+#' collect(glom(rdd))
+#' # list(list(1, 2), list(3, 4))
+#'}
+#' @rdname glom
+#' @aliases glom,RDD
+setMethod("glom",
+ signature(x = "RDD"),
+ function(x) {
+ partitionFunc <- function(part) {
+ list(part)
+ }
+
+ lapplyPartition(x, partitionFunc)
+ })
+
+############ Binary Functions #############
+
+#' Return the union RDD of two RDDs.
+#' The same as union() in Spark.
+#'
+#' @param x An RDD.
+#' @param y An RDD.
+#' @return a new RDD created by performing the simple union (witout removing
+#' duplicates) of two input RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:3)
+#' unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
+#'}
+#' @rdname unionRDD
+#' @aliases unionRDD,RDD,RDD-method
+setMethod("unionRDD",
+ signature(x = "RDD", y = "RDD"),
+ function(x, y) {
+ if (getSerializedMode(x) == getSerializedMode(y)) {
+ jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
+ union.rdd <- RDD(jrdd, getSerializedMode(x))
+ } else {
+ # One of the RDDs is not serialized, we need to serialize it first.
+ if (getSerializedMode(x) != "byte") x <- serializeToBytes(x)
+ if (getSerializedMode(y) != "byte") y <- serializeToBytes(y)
+ jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
+ union.rdd <- RDD(jrdd, "byte")
+ }
+ union.rdd
+ })
+
+#' Zip an RDD with another RDD.
+#'
+#' Zips this RDD with another one, returning key-value pairs with the
+#' first element in each RDD second element in each RDD, etc. Assumes
+#' that the two RDDs have the same number of partitions and the same
+#' number of elements in each partition (e.g. one was made through
+#' a map on the other).
+#'
+#' @param x An RDD to be zipped.
+#' @param other Another RDD to be zipped.
+#' @return An RDD zipped from the two RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, 0:4)
+#' rdd2 <- parallelize(sc, 1000:1004)
+#' collect(zipRDD(rdd1, rdd2))
+#' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
+#'}
+#' @rdname zipRDD
+#' @aliases zipRDD,RDD
+setMethod("zipRDD",
+ signature(x = "RDD", other = "RDD"),
+ function(x, other) {
+ n1 <- numPartitions(x)
+ n2 <- numPartitions(other)
+ if (n1 != n2) {
+ stop("Can only zip RDDs which have the same number of partitions.")
+ }
+
+ if (getSerializedMode(x) != getSerializedMode(other) ||
+ getSerializedMode(x) == "byte") {
+ # Append the number of elements in each partition to that partition so that we can later
+ # check if corresponding partitions of both RDDs have the same number of elements.
+ #
+ # Note that this appending also serves the purpose of reserialization, because even if
+ # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
+ # as a single byte array. For example, partitions of an RDD generated from partitionBy()
+ # may be encoded as multiple byte arrays.
+ appendLength <- function(part) {
+ part[[length(part) + 1]] <- length(part) + 1
+ part
+ }
+ x <- lapplyPartition(x, appendLength)
+ other <- lapplyPartition(other, appendLength)
+ }
+
+ zippedJRDD <- callJMethod(getJRDD(x), "zip", getJRDD(other))
+ # The zippedRDD's elements are of scala Tuple2 type. The serialized
+ # flag Here is used for the elements inside the tuples.
+ serializerMode <- getSerializedMode(x)
+ zippedRDD <- RDD(zippedJRDD, serializerMode)
+
+ partitionFunc <- function(split, part) {
+ len <- length(part)
+ if (len > 0) {
+ if (serializerMode == "byte") {
+ lengthOfValues <- part[[len]]
+ lengthOfKeys <- part[[len - lengthOfValues]]
+ stopifnot(len == lengthOfKeys + lengthOfValues)
+
+ # check if corresponding partitions of both RDDs have the same number of elements.
+ if (lengthOfKeys != lengthOfValues) {
+ stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
+ }
+
+ if (lengthOfKeys > 1) {
+ keys <- part[1 : (lengthOfKeys - 1)]
+ values <- part[(lengthOfKeys + 1) : (len - 1)]
+ } else {
+ keys <- list()
+ values <- list()
+ }
+ } else {
+ # Keys, values must have same length here, because this has
+ # been validated inside the JavaRDD.zip() function.
+ keys <- part[c(TRUE, FALSE)]
+ values <- part[c(FALSE, TRUE)]
+ }
+ mapply(
+ function(k, v) {
+ list(k, v)
+ },
+ keys,
+ values,
+ SIMPLIFY = FALSE,
+ USE.NAMES = FALSE)
+ } else {
+ part
+ }
+ }
+
+ PipelinedRDD(zippedRDD, partitionFunc)
+ })
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
new file mode 100644
index 0000000..930ada2
--- /dev/null
+++ b/R/pkg/R/SQLContext.R
@@ -0,0 +1,520 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# SQLcontext.R: SQLContext-driven functions
+
+#' infer the SQL type
+infer_type <- function(x) {
+ if (is.null(x)) {
+ stop("can not infer type from NULL")
+ }
+
+ # class of POSIXlt is c("POSIXlt" "POSIXt")
+ type <- switch(class(x)[[1]],
+ integer = "integer",
+ character = "string",
+ logical = "boolean",
+ double = "double",
+ numeric = "double",
+ raw = "binary",
+ list = "array",
+ environment = "map",
+ Date = "date",
+ POSIXlt = "timestamp",
+ POSIXct = "timestamp",
+ stop(paste("Unsupported type for DataFrame:", class(x))))
+
+ if (type == "map") {
+ stopifnot(length(x) > 0)
+ key <- ls(x)[[1]]
+ list(type = "map",
+ keyType = "string",
+ valueType = infer_type(get(key, x)),
+ valueContainsNull = TRUE)
+ } else if (type == "array") {
+ stopifnot(length(x) > 0)
+ names <- names(x)
+ if (is.null(names)) {
+ list(type = "array", elementType = infer_type(x[[1]]), containsNull = TRUE)
+ } else {
+ # StructType
+ types <- lapply(x, infer_type)
+ fields <- lapply(1:length(x), function(i) {
+ list(name = names[[i]], type = types[[i]], nullable = TRUE)
+ })
+ list(type = "struct", fields = fields)
+ }
+ } else if (length(x) > 1) {
+ list(type = "array", elementType = type, containsNull = TRUE)
+ } else {
+ type
+ }
+}
+
+#' dump the schema into JSON string
+tojson <- function(x) {
+ if (is.list(x)) {
+ names <- names(x)
+ if (!is.null(names)) {
+ items <- lapply(names, function(n) {
+ safe_n <- gsub('"', '\\"', n)
+ paste(tojson(safe_n), ':', tojson(x[[n]]), sep = '')
+ })
+ d <- paste(items, collapse = ', ')
+ paste('{', d, '}', sep = '')
+ } else {
+ l <- paste(lapply(x, tojson), collapse = ', ')
+ paste('[', l, ']', sep = '')
+ }
+ } else if (is.character(x)) {
+ paste('"', x, '"', sep = '')
+ } else if (is.logical(x)) {
+ if (x) "true" else "false"
+ } else {
+ stop(paste("unexpected type:", class(x)))
+ }
+}
+
+#' Create a DataFrame from an RDD
+#'
+#' Converts an RDD to a DataFrame by infer the types.
+#'
+#' @param sqlCtx A SQLContext
+#' @param data An RDD or list or data.frame
+#' @param schema a list of column names or named list (StructType), optional
+#' @return an DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
+#' df <- createDataFrame(sqlCtx, rdd)
+#' }
+
+# TODO(davies): support sampling and infer type from NA
+createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
+ if (is.data.frame(data)) {
+ # get the names of columns, they will be put into RDD
+ schema <- names(data)
+ n <- nrow(data)
+ m <- ncol(data)
+ # get rid of factor type
+ dropFactor <- function(x) {
+ if (is.factor(x)) {
+ as.character(x)
+ } else {
+ x
+ }
+ }
+ data <- lapply(1:n, function(i) {
+ lapply(1:m, function(j) { dropFactor(data[i,j]) })
+ })
+ }
+ if (is.list(data)) {
+ sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlCtx)
+ rdd <- parallelize(sc, data)
+ } else if (inherits(data, "RDD")) {
+ rdd <- data
+ } else {
+ stop(paste("unexpected type:", class(data)))
+ }
+
+ if (is.null(schema) || is.null(names(schema))) {
+ row <- first(rdd)
+ names <- if (is.null(schema)) {
+ names(row)
+ } else {
+ as.list(schema)
+ }
+ if (is.null(names)) {
+ names <- lapply(1:length(row), function(x) {
+ paste("_", as.character(x), sep = "")
+ })
+ }
+
+ # SPAKR-SQL does not support '.' in column name, so replace it with '_'
+ # TODO(davies): remove this once SPARK-2775 is fixed
+ names <- lapply(names, function(n) {
+ nn <- gsub("[.]", "_", n)
+ if (nn != n) {
+ warning(paste("Use", nn, "instead of", n, " as column name"))
+ }
+ nn
+ })
+
+ types <- lapply(row, infer_type)
+ fields <- lapply(1:length(row), function(i) {
+ list(name = names[[i]], type = types[[i]], nullable = TRUE)
+ })
+ schema <- list(type = "struct", fields = fields)
+ }
+
+ stopifnot(class(schema) == "list")
+ stopifnot(schema$type == "struct")
+ stopifnot(class(schema$fields) == "list")
+ schemaString <- tojson(schema)
+
+ jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
+ srdd <- callJMethod(jrdd, "rdd")
+ sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
+ srdd, schemaString, sqlCtx)
+ dataFrame(sdf)
+}
+
+#' toDF
+#'
+#' Converts an RDD to a DataFrame by infer the types.
+#'
+#' @param x An RDD
+#'
+#' @rdname DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
+#' df <- toDF(rdd)
+#' }
+
+setGeneric("toDF", function(x, ...) { standardGeneric("toDF") })
+
+setMethod("toDF", signature(x = "RDD"),
+ function(x, ...) {
+ sqlCtx <- if (exists(".sparkRHivesc", envir = .sparkREnv)) {
+ get(".sparkRHivesc", envir = .sparkREnv)
+ } else if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
+ get(".sparkRSQLsc", envir = .sparkREnv)
+ } else {
+ stop("no SQL context available")
+ }
+ createDataFrame(sqlCtx, x, ...)
+ })
+
+#' Create a DataFrame from a JSON file.
+#'
+#' Loads a JSON file (one object per line), returning the result as a DataFrame
+#' It goes through the entire dataset once to determine the schema.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' }
+
+jsonFile <- function(sqlCtx, path) {
+ # Allow the user to have a more flexible definiton of the text file path
+ path <- normalizePath(path)
+ # Convert a string vector of paths to a string containing comma separated paths
+ path <- paste(path, collapse = ",")
+ sdf <- callJMethod(sqlCtx, "jsonFile", path)
+ dataFrame(sdf)
+}
+
+
+#' JSON RDD
+#'
+#' Loads an RDD storing one JSON object per string as a DataFrame.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param rdd An RDD of JSON string
+#' @param schema A StructType object to use as schema
+#' @param samplingRatio The ratio of simpling used to infer the schema
+#' @return A DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' rdd <- texFile(sc, "path/to/json")
+#' df <- jsonRDD(sqlCtx, rdd)
+#' }
+
+# TODO: support schema
+jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) {
+ rdd <- serializeToString(rdd)
+ if (is.null(schema)) {
+ sdf <- callJMethod(sqlCtx, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio)
+ dataFrame(sdf)
+ } else {
+ stop("not implemented")
+ }
+}
+
+
+#' Create a DataFrame from a Parquet file.
+#'
+#' Loads a Parquet file, returning the result as a DataFrame.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param ... Path(s) of parquet file(s) to read.
+#' @return DataFrame
+#' @export
+
+# TODO: Implement saveasParquetFile and write examples for both
+parquetFile <- function(sqlCtx, ...) {
+ # Allow the user to have a more flexible definiton of the text file path
+ paths <- lapply(list(...), normalizePath)
+ sdf <- callJMethod(sqlCtx, "parquetFile", paths)
+ dataFrame(sdf)
+}
+
+#' SQL Query
+#'
+#' Executes a SQL query using Spark, returning the result as a DataFrame.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param sqlQuery A character vector containing the SQL query
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' registerTempTable(df, "table")
+#' new_df <- sql(sqlCtx, "SELECT * FROM table")
+#' }
+
+sql <- function(sqlCtx, sqlQuery) {
+ sdf <- callJMethod(sqlCtx, "sql", sqlQuery)
+ dataFrame(sdf)
+}
+
+
+#' Create a DataFrame from a SparkSQL Table
+#'
+#' Returns the specified Table as a DataFrame. The Table must have already been registered
+#' in the SQLContext.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName The SparkSQL Table to convert to a DataFrame.
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' registerTempTable(df, "table")
+#' new_df <- table(sqlCtx, "table")
+#' }
+
+table <- function(sqlCtx, tableName) {
+ sdf <- callJMethod(sqlCtx, "table", tableName)
+ dataFrame(sdf)
+}
+
+
+#' Tables
+#'
+#' Returns a DataFrame containing names of tables in the given database.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param databaseName name of the database
+#' @return a DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' tables(sqlCtx, "hive")
+#' }
+
+tables <- function(sqlCtx, databaseName = NULL) {
+ jdf <- if (is.null(databaseName)) {
+ callJMethod(sqlCtx, "tables")
+ } else {
+ callJMethod(sqlCtx, "tables", databaseName)
+ }
+ dataFrame(jdf)
+}
+
+
+#' Table Names
+#'
+#' Returns the names of tables in the given database as an array.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param databaseName name of the database
+#' @return a list of table names
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' tableNames(sqlCtx, "hive")
+#' }
+
+tableNames <- function(sqlCtx, databaseName = NULL) {
+ if (is.null(databaseName)) {
+ callJMethod(sqlCtx, "tableNames")
+ } else {
+ callJMethod(sqlCtx, "tableNames", databaseName)
+ }
+}
+
+
+#' Cache Table
+#'
+#' Caches the specified table in-memory.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName The name of the table being cached
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' registerTempTable(df, "table")
+#' cacheTable(sqlCtx, "table")
+#' }
+
+cacheTable <- function(sqlCtx, tableName) {
+ callJMethod(sqlCtx, "cacheTable", tableName)
+}
+
+#' Uncache Table
+#'
+#' Removes the specified table from the in-memory cache.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName The name of the table being uncached
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' path <- "path/to/file.json"
+#' df <- jsonFile(sqlCtx, path)
+#' registerTempTable(df, "table")
+#' uncacheTable(sqlCtx, "table")
+#' }
+
+uncacheTable <- function(sqlCtx, tableName) {
+ callJMethod(sqlCtx, "uncacheTable", tableName)
+}
+
+#' Clear Cache
+#'
+#' Removes all cached tables from the in-memory cache.
+#'
+#' @param sqlCtx SQLContext to use
+#' @examples
+#' \dontrun{
+#' clearCache(sqlCtx)
+#' }
+
+clearCache <- function(sqlCtx) {
+ callJMethod(sqlCtx, "clearCache")
+}
+
+#' Drop Temporary Table
+#'
+#' Drops the temporary table with the given table name in the catalog.
+#' If the table has been cached/persisted before, it's also unpersisted.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName The name of the SparkSQL table to be dropped.
+#' @examples
+#' \dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df <- loadDF(sqlCtx, path, "parquet")
+#' registerTempTable(df, "table")
+#' dropTempTable(sqlCtx, "table")
+#' }
+
+dropTempTable <- function(sqlCtx, tableName) {
+ if (class(tableName) != "character") {
+ stop("tableName must be a string.")
+ }
+ callJMethod(sqlCtx, "dropTempTable", tableName)
+}
+
+#' Load an DataFrame
+#'
+#' Returns the dataset in a data source as a DataFrame
+#'
+#' The data source is specified by the `source` and a set of options(...).
+#' If `source` is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param path The path of files to load
+#' @param source the name of external data source
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df <- load(sqlCtx, "path/to/file.json", source = "json")
+#' }
+
+loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) {
+ options <- varargsToEnv(...)
+ if (!is.null(path)) {
+ options[['path']] <- path
+ }
+ sdf <- callJMethod(sqlCtx, "load", source, options)
+ dataFrame(sdf)
+}
+
+#' Create an external table
+#'
+#' Creates an external table based on the dataset in a data source,
+#' Returns the DataFrame associated with the external table.
+#'
+#' The data source is specified by the `source` and a set of options(...).
+#' If `source` is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used.
+#'
+#' @param sqlCtx SQLContext to use
+#' @param tableName A name of the table
+#' @param path The path of files to load
+#' @param source the name of external data source
+#' @return DataFrame
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#' df <- sparkRSQL.createExternalTable(sqlCtx, "myjson", path="path/to/json", source="json")
+#' }
+
+createExternalTable <- function(sqlCtx, tableName, path = NULL, source = NULL, ...) {
+ options <- varargsToEnv(...)
+ if (!is.null(path)) {
+ options[['path']] <- path
+ }
+ sdf <- callJMethod(sqlCtx, "createExternalTable", tableName, source, options)
+ dataFrame(sdf)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/SQLTypes.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLTypes.R b/R/pkg/R/SQLTypes.R
new file mode 100644
index 0000000..962fba5
--- /dev/null
+++ b/R/pkg/R/SQLTypes.R
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Utility functions for handling SparkSQL DataTypes.
+
+# Handler for StructType
+structType <- function(st) {
+ obj <- structure(new.env(parent = emptyenv()), class = "structType")
+ obj$jobj <- st
+ obj$fields <- function() { lapply(callJMethod(st, "fields"), structField) }
+ obj
+}
+
+#' Print a Spark StructType.
+#'
+#' This function prints the contents of a StructType returned from the
+#' SparkR JVM backend.
+#'
+#' @param x A StructType object
+#' @param ... further arguments passed to or from other methods
+print.structType <- function(x, ...) {
+ fieldsList <- lapply(x$fields(), function(i) { i$print() })
+ print(fieldsList)
+}
+
+# Handler for StructField
+structField <- function(sf) {
+ obj <- structure(new.env(parent = emptyenv()), class = "structField")
+ obj$jobj <- sf
+ obj$name <- function() { callJMethod(sf, "name") }
+ obj$dataType <- function() { callJMethod(sf, "dataType") }
+ obj$dataType.toString <- function() { callJMethod(obj$dataType(), "toString") }
+ obj$dataType.simpleString <- function() { callJMethod(obj$dataType(), "simpleString") }
+ obj$nullable <- function() { callJMethod(sf, "nullable") }
+ obj$print <- function() { paste("StructField(",
+ paste(obj$name(), obj$dataType.toString(), obj$nullable(), sep = ", "),
+ ")", sep = "") }
+ obj
+}
+
+#' Print a Spark StructField.
+#'
+#' This function prints the contents of a StructField returned from the
+#' SparkR JVM backend.
+#'
+#' @param x A StructField object
+#' @param ... further arguments passed to or from other methods
+print.structField <- function(x, ...) {
+ cat(x$print())
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/backend.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/backend.R b/R/pkg/R/backend.R
new file mode 100644
index 0000000..2fb6fae
--- /dev/null
+++ b/R/pkg/R/backend.R
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Methods to call into SparkRBackend.
+
+
+# Returns TRUE if object is an instance of given class
+isInstanceOf <- function(jobj, className) {
+ stopifnot(class(jobj) == "jobj")
+ cls <- callJStatic("java.lang.Class", "forName", className)
+ callJMethod(cls, "isInstance", jobj)
+}
+
+# Call a Java method named methodName on the object
+# specified by objId. objId should be a "jobj" returned
+# from the SparkRBackend.
+callJMethod <- function(objId, methodName, ...) {
+ stopifnot(class(objId) == "jobj")
+ if (!isValidJobj(objId)) {
+ stop("Invalid jobj ", objId$id,
+ ". If SparkR was restarted, Spark operations need to be re-executed.")
+ }
+ invokeJava(isStatic = FALSE, objId$id, methodName, ...)
+}
+
+# Call a static method on a specified className
+callJStatic <- function(className, methodName, ...) {
+ invokeJava(isStatic = TRUE, className, methodName, ...)
+}
+
+# Create a new object of the specified class name
+newJObject <- function(className, ...) {
+ invokeJava(isStatic = TRUE, className, methodName = "<init>", ...)
+}
+
+# Remove an object from the SparkR backend. This is done
+# automatically when a jobj is garbage collected.
+removeJObject <- function(objId) {
+ invokeJava(isStatic = TRUE, "SparkRHandler", "rm", objId)
+}
+
+isRemoveMethod <- function(isStatic, objId, methodName) {
+ isStatic == TRUE && objId == "SparkRHandler" && methodName == "rm"
+}
+
+# Invoke a Java method on the SparkR backend. Users
+# should typically use one of the higher level methods like
+# callJMethod, callJStatic etc. instead of using this.
+#
+# isStatic - TRUE if the method to be called is static
+# objId - String that refers to the object on which method is invoked
+# Should be a jobj id for non-static methods and the classname
+# for static methods
+# methodName - name of method to be invoked
+invokeJava <- function(isStatic, objId, methodName, ...) {
+ if (!exists(".sparkRCon", .sparkREnv)) {
+ stop("No connection to backend found. Please re-run sparkR.init")
+ }
+
+ # If this isn't a removeJObject call
+ if (!isRemoveMethod(isStatic, objId, methodName)) {
+ objsToRemove <- ls(.toRemoveJobjs)
+ if (length(objsToRemove) > 0) {
+ sapply(objsToRemove,
+ function(e) {
+ removeJObject(e)
+ })
+ rm(list = objsToRemove, envir = .toRemoveJobjs)
+ }
+ }
+
+
+ rc <- rawConnection(raw(0), "r+")
+
+ writeBoolean(rc, isStatic)
+ writeString(rc, objId)
+ writeString(rc, methodName)
+
+ args <- list(...)
+ writeInt(rc, length(args))
+ writeArgs(rc, args)
+
+ # Construct the whole request message to send it once,
+ # avoiding write-write-read pattern in case of Nagle's algorithm.
+ # Refer to http://en.wikipedia.org/wiki/Nagle%27s_algorithm for the details.
+ bytesToSend <- rawConnectionValue(rc)
+ close(rc)
+ rc <- rawConnection(raw(0), "r+")
+ writeInt(rc, length(bytesToSend))
+ writeBin(bytesToSend, rc)
+ requestMessage <- rawConnectionValue(rc)
+ close(rc)
+
+ conn <- get(".sparkRCon", .sparkREnv)
+ writeBin(requestMessage, conn)
+
+ # TODO: check the status code to output error information
+ returnStatus <- readInt(conn)
+ stopifnot(returnStatus == 0)
+ readObject(conn)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/broadcast.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/broadcast.R b/R/pkg/R/broadcast.R
new file mode 100644
index 0000000..583fa2e
--- /dev/null
+++ b/R/pkg/R/broadcast.R
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# S4 class representing Broadcast variables
+
+# Hidden environment that holds values for broadcast variables
+# This will not be serialized / shipped by default
+.broadcastNames <- new.env()
+.broadcastValues <- new.env()
+.broadcastIdToName <- new.env()
+
+#' @title S4 class that represents a Broadcast variable
+#' @description Broadcast variables can be created using the broadcast
+#' function from a \code{SparkContext}.
+#' @rdname broadcast-class
+#' @seealso broadcast
+#'
+#' @param id Id of the backing Spark broadcast variable
+#' @export
+setClass("Broadcast", slots = list(id = "character"))
+
+#' @rdname broadcast-class
+#' @param value Value of the broadcast variable
+#' @param jBroadcastRef reference to the backing Java broadcast object
+#' @param objName name of broadcasted object
+#' @export
+Broadcast <- function(id, value, jBroadcastRef, objName) {
+ .broadcastValues[[id]] <- value
+ .broadcastNames[[as.character(objName)]] <- jBroadcastRef
+ .broadcastIdToName[[id]] <- as.character(objName)
+ new("Broadcast", id = id)
+}
+
+#' @description
+#' \code{value} can be used to get the value of a broadcast variable inside
+#' a distributed function.
+#'
+#' @param bcast The broadcast variable to get
+#' @rdname broadcast
+#' @aliases value,Broadcast-method
+setMethod("value",
+ signature(bcast = "Broadcast"),
+ function(bcast) {
+ if (exists(bcast@id, envir = .broadcastValues)) {
+ get(bcast@id, envir = .broadcastValues)
+ } else {
+ NULL
+ }
+ })
+
+#' Internal function to set values of a broadcast variable.
+#'
+#' This function is used internally by Spark to set the value of a broadcast
+#' variable on workers. Not intended for use outside the package.
+#'
+#' @rdname broadcast-internal
+#' @seealso broadcast, value
+
+#' @param bcastId The id of broadcast variable to set
+#' @param value The value to be set
+#' @export
+setBroadcastValue <- function(bcastId, value) {
+ bcastIdStr <- as.character(bcastId)
+ .broadcastValues[[bcastIdStr]] <- value
+}
+
+#' Helper function to clear the list of broadcast variables we know about
+#' Should be called when the SparkR JVM backend is shutdown
+clearBroadcastVariables <- function() {
+ bcasts <- ls(.broadcastNames)
+ rm(list = bcasts, envir = .broadcastNames)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/client.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
new file mode 100644
index 0000000..1281c41
--- /dev/null
+++ b/R/pkg/R/client.R
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Client code to connect to SparkRBackend
+
+# Creates a SparkR client connection object
+# if one doesn't already exist
+connectBackend <- function(hostname, port, timeout = 6000) {
+ if (exists(".sparkRcon", envir = .sparkREnv)) {
+ if (isOpen(.sparkREnv[[".sparkRCon"]])) {
+ cat("SparkRBackend client connection already exists\n")
+ return(get(".sparkRcon", envir = .sparkREnv))
+ }
+ }
+
+ con <- socketConnection(host = hostname, port = port, server = FALSE,
+ blocking = TRUE, open = "wb", timeout = timeout)
+
+ assign(".sparkRCon", con, envir = .sparkREnv)
+ con
+}
+
+launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts) {
+ if (.Platform$OS.type == "unix") {
+ sparkSubmitBinName = "spark-submit"
+ } else {
+ sparkSubmitBinName = "spark-submit.cmd"
+ }
+
+ if (sparkHome != "") {
+ sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
+ } else {
+ sparkSubmitBin <- sparkSubmitBinName
+ }
+
+ if (jars != "") {
+ jars <- paste("--jars", jars)
+ }
+
+ combinedArgs <- paste(jars, sparkSubmitOpts, args, sep = " ")
+ cat("Launching java with spark-submit command", sparkSubmitBin, combinedArgs, "\n")
+ invisible(system2(sparkSubmitBin, combinedArgs, wait = F))
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/column.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/column.R b/R/pkg/R/column.R
new file mode 100644
index 0000000..e196305
--- /dev/null
+++ b/R/pkg/R/column.R
@@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Column Class
+
+#' @include generics.R jobj.R
+NULL
+
+setOldClass("jobj")
+
+#' @title S4 class that represents a DataFrame column
+#' @description The column class supports unary, binary operations on DataFrame columns
+
+#' @rdname column
+#'
+#' @param jc reference to JVM DataFrame column
+#' @export
+setClass("Column",
+ slots = list(jc = "jobj"))
+
+setMethod("initialize", "Column", function(.Object, jc) {
+ .Object@jc <- jc
+ .Object
+})
+
+column <- function(jc) {
+ new("Column", jc)
+}
+
+col <- function(x) {
+ column(callJStatic("org.apache.spark.sql.functions", "col", x))
+}
+
+#' @rdname show
+setMethod("show", "Column",
+ function(object) {
+ cat("Column", callJMethod(object@jc, "toString"), "\n")
+ })
+
+operators <- list(
+ "+" = "plus", "-" = "minus", "*" = "multiply", "/" = "divide", "%%" = "mod",
+ "==" = "equalTo", ">" = "gt", "<" = "lt", "!=" = "notEqual", "<=" = "leq", ">=" = "geq",
+ # we can not override `&&` and `||`, so use `&` and `|` instead
+ "&" = "and", "|" = "or" #, "!" = "unary_$bang"
+)
+column_functions1 <- c("asc", "desc", "isNull", "isNotNull")
+column_functions2 <- c("like", "rlike", "startsWith", "endsWith", "getField", "getItem", "contains")
+functions <- c("min", "max", "sum", "avg", "mean", "count", "abs", "sqrt",
+ "first", "last", "lower", "upper", "sumDistinct")
+
+createOperator <- function(op) {
+ setMethod(op,
+ signature(e1 = "Column"),
+ function(e1, e2) {
+ jc <- if (missing(e2)) {
+ if (op == "-") {
+ callJMethod(e1@jc, "unary_$minus")
+ } else {
+ callJMethod(e1@jc, operators[[op]])
+ }
+ } else {
+ if (class(e2) == "Column") {
+ e2 <- e2@jc
+ }
+ callJMethod(e1@jc, operators[[op]], e2)
+ }
+ column(jc)
+ })
+}
+
+createColumnFunction1 <- function(name) {
+ setMethod(name,
+ signature(x = "Column"),
+ function(x) {
+ column(callJMethod(x@jc, name))
+ })
+}
+
+createColumnFunction2 <- function(name) {
+ setMethod(name,
+ signature(x = "Column"),
+ function(x, data) {
+ if (class(data) == "Column") {
+ data <- data@jc
+ }
+ jc <- callJMethod(x@jc, name, data)
+ column(jc)
+ })
+}
+
+createStaticFunction <- function(name) {
+ setMethod(name,
+ signature(x = "Column"),
+ function(x) {
+ jc <- callJStatic("org.apache.spark.sql.functions", name, x@jc)
+ column(jc)
+ })
+}
+
+createMethods <- function() {
+ for (op in names(operators)) {
+ createOperator(op)
+ }
+ for (name in column_functions1) {
+ createColumnFunction1(name)
+ }
+ for (name in column_functions2) {
+ createColumnFunction2(name)
+ }
+ for (x in functions) {
+ createStaticFunction(x)
+ }
+}
+
+createMethods()
+
+#' alias
+#'
+#' Set a new name for a column
+setMethod("alias",
+ signature(object = "Column"),
+ function(object, data) {
+ if (is.character(data)) {
+ column(callJMethod(object@jc, "as", data))
+ } else {
+ stop("data should be character")
+ }
+ })
+
+#' An expression that returns a substring.
+#'
+#' @param start starting position
+#' @param stop ending position
+setMethod("substr", signature(x = "Column"),
+ function(x, start, stop) {
+ jc <- callJMethod(x@jc, "substr", as.integer(start - 1), as.integer(stop - start + 1))
+ column(jc)
+ })
+
+#' Casts the column to a different data type.
+#' @examples
+#' \dontrun{
+#' cast(df$age, "string")
+#' cast(df$name, list(type="array", elementType="byte", containsNull = TRUE))
+#' }
+setMethod("cast",
+ signature(x = "Column"),
+ function(x, dataType) {
+ if (is.character(dataType)) {
+ column(callJMethod(x@jc, "cast", dataType))
+ } else if (is.list(dataType)) {
+ json <- tojson(dataType)
+ jdataType <- callJStatic("org.apache.spark.sql.types.DataType", "fromJson", json)
+ column(callJMethod(x@jc, "cast", jdataType))
+ } else {
+ stop("dataType should be character or list")
+ }
+ })
+
+#' Approx Count Distinct
+#'
+#' Returns the approximate number of distinct items in a group.
+#'
+setMethod("approxCountDistinct",
+ signature(x = "Column"),
+ function(x, rsd = 0.95) {
+ jc <- callJStatic("org.apache.spark.sql.functions", "approxCountDistinct", x@jc, rsd)
+ column(jc)
+ })
+
+#' Count Distinct
+#'
+#' returns the number of distinct items in a group.
+#'
+setMethod("countDistinct",
+ signature(x = "Column"),
+ function(x, ...) {
+ jcol <- lapply(list(...), function (x) {
+ x@jc
+ })
+ jc <- callJStatic("org.apache.spark.sql.functions", "countDistinct", x@jc,
+ listToSeq(jcol))
+ column(jc)
+ })
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org