You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/04/09 07:46:00 UTC
[4/7] spark git commit: [SPARK-5654] Integrate SparkR
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
new file mode 100644
index 0000000..2fc0bb2
--- /dev/null
+++ b/R/pkg/R/context.R
@@ -0,0 +1,225 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# context.R: SparkContext driven functions
+
+getMinSplits <- function(sc, minSplits) {
+ if (is.null(minSplits)) {
+ defaultParallelism <- callJMethod(sc, "defaultParallelism")
+ minSplits <- min(defaultParallelism, 2)
+ }
+ as.integer(minSplits)
+}
+
+#' Create an RDD from a text file.
+#'
+#' This function reads a text file from HDFS, a local file system (available on all
+#' nodes), or any Hadoop-supported file system URI, and creates an
+#' RDD of strings from it.
+#'
+#' @param sc SparkContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param minSplits Minimum number of splits to be created. If NULL, the default
+#' value is chosen based on available parallelism.
+#' @return RDD where each item is of type \code{character}
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' lines <- textFile(sc, "myfile.txt")
+#'}
+textFile <- function(sc, path, minSplits = NULL) {
+ # Allow the user to have a more flexible definiton of the text file path
+ path <- suppressWarnings(normalizePath(path))
+ #' Convert a string vector of paths to a string containing comma separated paths
+ path <- paste(path, collapse = ",")
+
+ jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
+ # jrdd is of type JavaRDD[String]
+ RDD(jrdd, "string")
+}
+
+#' Load an RDD saved as a SequenceFile containing serialized objects.
+#'
+#' The file to be loaded should be one that was previously generated by calling
+#' saveAsObjectFile() of the RDD class.
+#'
+#' @param sc SparkContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param minSplits Minimum number of splits to be created. If NULL, the default
+#' value is chosen based on available parallelism.
+#' @return RDD containing serialized R objects.
+#' @seealso saveAsObjectFile
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- objectFile(sc, "myfile")
+#'}
+objectFile <- function(sc, path, minSplits = NULL) {
+ # Allow the user to have a more flexible definiton of the text file path
+ path <- suppressWarnings(normalizePath(path))
+ #' Convert a string vector of paths to a string containing comma separated paths
+ path <- paste(path, collapse = ",")
+
+ jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
+ # Assume the RDD contains serialized R objects.
+ RDD(jrdd, "byte")
+}
+
+#' Create an RDD from a homogeneous list or vector.
+#'
+#' This function creates an RDD from a local homogeneous list in R. The elements
+#' in the list are split into \code{numSlices} slices and distributed to nodes
+#' in the cluster.
+#'
+#' @param sc SparkContext to use
+#' @param coll collection to parallelize
+#' @param numSlices number of partitions to create in the RDD
+#' @return an RDD created from this collection
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2)
+#' # The RDD should contain 10 elements
+#' length(rdd)
+#'}
+parallelize <- function(sc, coll, numSlices = 1) {
+ # TODO: bound/safeguard numSlices
+ # TODO: unit tests for if the split works for all primitives
+ # TODO: support matrix, data frame, etc
+ if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) {
+ if (is.data.frame(coll)) {
+ message(paste("context.R: A data frame is parallelized by columns."))
+ } else {
+ if (is.matrix(coll)) {
+ message(paste("context.R: A matrix is parallelized by elements."))
+ } else {
+ message(paste("context.R: parallelize() currently only supports lists and vectors.",
+ "Calling as.list() to coerce coll into a list."))
+ }
+ }
+ coll <- as.list(coll)
+ }
+
+ if (numSlices > length(coll))
+ numSlices <- length(coll)
+
+ sliceLen <- ceiling(length(coll) / numSlices)
+ slices <- split(coll, rep(1:(numSlices + 1), each = sliceLen)[1:length(coll)])
+
+ # Serialize each slice: obtain a list of raws, or a list of lists (slices) of
+ # 2-tuples of raws
+ serializedSlices <- lapply(slices, serialize, connection = NULL)
+
+ jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
+ "createRDDFromArray", sc, serializedSlices)
+
+ RDD(jrdd, "byte")
+}
+
+#' Include this specified package on all workers
+#'
+#' This function can be used to include a package on all workers before the
+#' user's code is executed. This is useful in scenarios where other R package
+#' functions are used in a function passed to functions like \code{lapply}.
+#' NOTE: The package is assumed to be installed on every node in the Spark
+#' cluster.
+#'
+#' @param sc SparkContext to use
+#' @param pkg Package name
+#'
+#' @export
+#' @examples
+#'\dontrun{
+#' library(Matrix)
+#'
+#' sc <- sparkR.init()
+#' # Include the matrix library we will be using
+#' includePackage(sc, Matrix)
+#'
+#' generateSparse <- function(x) {
+#' sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
+#' }
+#'
+#' rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
+#' collect(rdd)
+#'}
+includePackage <- function(sc, pkg) {
+ pkg <- as.character(substitute(pkg))
+ if (exists(".packages", .sparkREnv)) {
+ packages <- .sparkREnv$.packages
+ } else {
+ packages <- list()
+ }
+ packages <- c(packages, pkg)
+ .sparkREnv$.packages <- packages
+}
+
+#' @title Broadcast a variable to all workers
+#'
+#' @description
+#' Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
+#' object for reading it in distributed functions.
+#'
+#' @param sc Spark Context to use
+#' @param object Object to be broadcast
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:2, 2L)
+#'
+#' # Large Matrix object that we want to broadcast
+#' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
+#' randomMatBr <- broadcast(sc, randomMat)
+#'
+#' # Use the broadcast variable inside the function
+#' useBroadcast <- function(x) {
+#' sum(value(randomMatBr) * x)
+#' }
+#' sumRDD <- lapply(rdd, useBroadcast)
+#'}
+broadcast <- function(sc, object) {
+ objName <- as.character(substitute(object))
+ serializedObj <- serialize(object, connection = NULL)
+
+ jBroadcast <- callJMethod(sc, "broadcast", serializedObj)
+ id <- as.character(callJMethod(jBroadcast, "id"))
+
+ Broadcast(id, object, jBroadcast, objName)
+}
+
+#' @title Set the checkpoint directory
+#'
+#' Set the directory under which RDDs are going to be checkpointed. The
+#' directory must be a HDFS path if running on a cluster.
+#'
+#' @param sc Spark Context to use
+#' @param dirName Directory path
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' setCheckpointDir(sc, "~/checkpoints")
+#' rdd <- parallelize(sc, 1:2, 2L)
+#' checkpoint(rdd)
+#'}
+setCheckpointDir <- function(sc, dirName) {
+ invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/deserialize.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R
new file mode 100644
index 0000000..257b435
--- /dev/null
+++ b/R/pkg/R/deserialize.R
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Utility functions to deserialize objects from Java.
+
+# Type mapping from Java to R
+#
+# void -> NULL
+# Int -> integer
+# String -> character
+# Boolean -> logical
+# Double -> double
+# Long -> double
+# Array[Byte] -> raw
+# Date -> Date
+# Time -> POSIXct
+#
+# Array[T] -> list()
+# Object -> jobj
+
+readObject <- function(con) {
+ # Read type first
+ type <- readType(con)
+ readTypedObject(con, type)
+}
+
+readTypedObject <- function(con, type) {
+ switch (type,
+ "i" = readInt(con),
+ "c" = readString(con),
+ "b" = readBoolean(con),
+ "d" = readDouble(con),
+ "r" = readRaw(con),
+ "D" = readDate(con),
+ "t" = readTime(con),
+ "l" = readList(con),
+ "n" = NULL,
+ "j" = getJobj(readString(con)),
+ stop(paste("Unsupported type for deserialization", type)))
+}
+
+readString <- function(con) {
+ stringLen <- readInt(con)
+ string <- readBin(con, raw(), stringLen, endian = "big")
+ rawToChar(string)
+}
+
+readInt <- function(con) {
+ readBin(con, integer(), n = 1, endian = "big")
+}
+
+readDouble <- function(con) {
+ readBin(con, double(), n = 1, endian = "big")
+}
+
+readBoolean <- function(con) {
+ as.logical(readInt(con))
+}
+
+readType <- function(con) {
+ rawToChar(readBin(con, "raw", n = 1L))
+}
+
+readDate <- function(con) {
+ as.Date(readString(con))
+}
+
+readTime <- function(con) {
+ t <- readDouble(con)
+ as.POSIXct(t, origin = "1970-01-01")
+}
+
+# We only support lists where all elements are of same type
+readList <- function(con) {
+ type <- readType(con)
+ len <- readInt(con)
+ if (len > 0) {
+ l <- vector("list", len)
+ for (i in 1:len) {
+ l[[i]] <- readTypedObject(con, type)
+ }
+ l
+ } else {
+ list()
+ }
+}
+
+readRaw <- function(con) {
+ dataLen <- readInt(con)
+ data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
+}
+
+readRawLen <- function(con, dataLen) {
+ data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
+}
+
+readDeserialize <- function(con) {
+ # We have two cases that are possible - In one, the entire partition is
+ # encoded as a byte array, so we have only one value to read. If so just
+ # return firstData
+ dataLen <- readInt(con)
+ firstData <- unserialize(
+ readBin(con, raw(), as.integer(dataLen), endian = "big"))
+
+ # Else, read things into a list
+ dataLen <- readInt(con)
+ if (length(dataLen) > 0 && dataLen > 0) {
+ data <- list(firstData)
+ while (length(dataLen) > 0 && dataLen > 0) {
+ data[[length(data) + 1L]] <- unserialize(
+ readBin(con, raw(), as.integer(dataLen), endian = "big"))
+ dataLen <- readInt(con)
+ }
+ unlist(data, recursive = FALSE)
+ } else {
+ firstData
+ }
+}
+
+readDeserializeRows <- function(inputCon) {
+ # readDeserializeRows will deserialize a DataOutputStream composed of
+ # a list of lists. Since the DOS is one continuous stream and
+ # the number of rows varies, we put the readRow function in a while loop
+ # that termintates when the next row is empty.
+ data <- list()
+ while(TRUE) {
+ row <- readRow(inputCon)
+ if (length(row) == 0) {
+ break
+ }
+ data[[length(data) + 1L]] <- row
+ }
+ data # this is a list of named lists now
+}
+
+readRowList <- function(obj) {
+ # readRowList is meant for use inside an lapply. As a result, it is
+ # necessary to open a standalone connection for the row and consume
+ # the numCols bytes inside the read function in order to correctly
+ # deserialize the row.
+ rawObj <- rawConnection(obj, "r+")
+ on.exit(close(rawObj))
+ readRow(rawObj)
+}
+
+readRow <- function(inputCon) {
+ numCols <- readInt(inputCon)
+ if (length(numCols) > 0 && numCols > 0) {
+ lapply(1:numCols, function(x) {
+ obj <- readObject(inputCon)
+ if (is.null(obj)) {
+ NA
+ } else {
+ obj
+ }
+ }) # each row is a list now
+ } else {
+ list()
+ }
+}
+
+# Take a single column as Array[Byte] and deserialize it into an atomic vector
+readCol <- function(inputCon, numRows) {
+ # sapply can not work with POSIXlt
+ do.call(c, lapply(1:numRows, function(x) {
+ value <- readObject(inputCon)
+ # Replace NULL with NA so we can coerce to vectors
+ if (is.null(value)) NA else value
+ }))
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
new file mode 100644
index 0000000..5fb1cca
--- /dev/null
+++ b/R/pkg/R/generics.R
@@ -0,0 +1,543 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+############ RDD Actions and Transformations ############
+
+#' @rdname aggregateRDD
+#' @seealso reduce
+#' @export
+setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
+
+#' @rdname cache-methods
+#' @export
+setGeneric("cache", function(x) { standardGeneric("cache") })
+
+#' @rdname coalesce
+#' @seealso repartition
+#' @export
+setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
+
+#' @rdname checkpoint-methods
+#' @export
+setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") })
+
+#' @rdname collect-methods
+#' @export
+setGeneric("collect", function(x, ...) { standardGeneric("collect") })
+
+#' @rdname collect-methods
+#' @export
+setGeneric("collectAsMap", function(x) { standardGeneric("collectAsMap") })
+
+#' @rdname collect-methods
+#' @export
+setGeneric("collectPartition",
+ function(x, partitionId) {
+ standardGeneric("collectPartition")
+ })
+
+#' @rdname count
+#' @export
+setGeneric("count", function(x) { standardGeneric("count") })
+
+#' @rdname countByValue
+#' @export
+setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
+
+#' @rdname distinct
+#' @export
+setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") })
+
+#' @rdname filterRDD
+#' @export
+setGeneric("filterRDD", function(x, f) { standardGeneric("filterRDD") })
+
+#' @rdname first
+#' @export
+setGeneric("first", function(x) { standardGeneric("first") })
+
+#' @rdname flatMap
+#' @export
+setGeneric("flatMap", function(X, FUN) { standardGeneric("flatMap") })
+
+#' @rdname fold
+#' @seealso reduce
+#' @export
+setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") })
+
+#' @rdname foreach
+#' @export
+setGeneric("foreach", function(x, func) { standardGeneric("foreach") })
+
+#' @rdname foreach
+#' @export
+setGeneric("foreachPartition", function(x, func) { standardGeneric("foreachPartition") })
+
+# The jrdd accessor function.
+setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") })
+
+#' @rdname glom
+#' @export
+setGeneric("glom", function(x) { standardGeneric("glom") })
+
+#' @rdname keyBy
+#' @export
+setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") })
+
+#' @rdname lapplyPartition
+#' @export
+setGeneric("lapplyPartition", function(X, FUN) { standardGeneric("lapplyPartition") })
+
+#' @rdname lapplyPartitionsWithIndex
+#' @export
+setGeneric("lapplyPartitionsWithIndex",
+ function(X, FUN) {
+ standardGeneric("lapplyPartitionsWithIndex")
+ })
+
+#' @rdname lapply
+#' @export
+setGeneric("map", function(X, FUN) { standardGeneric("map") })
+
+#' @rdname lapplyPartition
+#' @export
+setGeneric("mapPartitions", function(X, FUN) { standardGeneric("mapPartitions") })
+
+#' @rdname lapplyPartitionsWithIndex
+#' @export
+setGeneric("mapPartitionsWithIndex",
+ function(X, FUN) { standardGeneric("mapPartitionsWithIndex") })
+
+#' @rdname maximum
+#' @export
+setGeneric("maximum", function(x) { standardGeneric("maximum") })
+
+#' @rdname minimum
+#' @export
+setGeneric("minimum", function(x) { standardGeneric("minimum") })
+
+#' @rdname sumRDD
+#' @export
+setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
+
+#' @rdname name
+#' @export
+setGeneric("name", function(x) { standardGeneric("name") })
+
+#' @rdname numPartitions
+#' @export
+setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") })
+
+#' @rdname persist
+#' @export
+setGeneric("persist", function(x, newLevel) { standardGeneric("persist") })
+
+#' @rdname pipeRDD
+#' @export
+setGeneric("pipeRDD", function(x, command, env = list()) { standardGeneric("pipeRDD")})
+
+#' @rdname reduce
+#' @export
+setGeneric("reduce", function(x, func) { standardGeneric("reduce") })
+
+#' @rdname repartition
+#' @seealso coalesce
+#' @export
+setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") })
+
+#' @rdname sampleRDD
+#' @export
+setGeneric("sampleRDD",
+ function(x, withReplacement, fraction, seed) {
+ standardGeneric("sampleRDD")
+ })
+
+#' @rdname saveAsObjectFile
+#' @seealso objectFile
+#' @export
+setGeneric("saveAsObjectFile", function(x, path) { standardGeneric("saveAsObjectFile") })
+
+#' @rdname saveAsTextFile
+#' @export
+setGeneric("saveAsTextFile", function(x, path) { standardGeneric("saveAsTextFile") })
+
+#' @rdname setName
+#' @export
+setGeneric("setName", function(x, name) { standardGeneric("setName") })
+
+#' @rdname sortBy
+#' @export
+setGeneric("sortBy",
+ function(x, func, ascending = TRUE, numPartitions = 1L) {
+ standardGeneric("sortBy")
+ })
+
+#' @rdname take
+#' @export
+setGeneric("take", function(x, num) { standardGeneric("take") })
+
+#' @rdname takeOrdered
+#' @export
+setGeneric("takeOrdered", function(x, num) { standardGeneric("takeOrdered") })
+
+#' @rdname takeSample
+#' @export
+setGeneric("takeSample",
+ function(x, withReplacement, num, seed) {
+ standardGeneric("takeSample")
+ })
+
+#' @rdname top
+#' @export
+setGeneric("top", function(x, num) { standardGeneric("top") })
+
+#' @rdname unionRDD
+#' @export
+setGeneric("unionRDD", function(x, y) { standardGeneric("unionRDD") })
+
+#' @rdname unpersist-methods
+#' @export
+setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
+
+#' @rdname zipRDD
+#' @export
+setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
+
+#' @rdname zipWithIndex
+#' @seealso zipWithUniqueId
+#' @export
+setGeneric("zipWithIndex", function(x) { standardGeneric("zipWithIndex") })
+
+#' @rdname zipWithUniqueId
+#' @seealso zipWithIndex
+#' @export
+setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") })
+
+
+############ Binary Functions #############
+
+#' @rdname countByKey
+#' @export
+setGeneric("countByKey", function(x) { standardGeneric("countByKey") })
+
+#' @rdname flatMapValues
+#' @export
+setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") })
+
+#' @rdname keys
+#' @export
+setGeneric("keys", function(x) { standardGeneric("keys") })
+
+#' @rdname lookup
+#' @export
+setGeneric("lookup", function(x, key) { standardGeneric("lookup") })
+
+#' @rdname mapValues
+#' @export
+setGeneric("mapValues", function(X, FUN) { standardGeneric("mapValues") })
+
+#' @rdname values
+#' @export
+setGeneric("values", function(x) { standardGeneric("values") })
+
+
+
+############ Shuffle Functions ############
+
+#' @rdname aggregateByKey
+#' @seealso foldByKey, combineByKey
+#' @export
+setGeneric("aggregateByKey",
+ function(x, zeroValue, seqOp, combOp, numPartitions) {
+ standardGeneric("aggregateByKey")
+ })
+
+#' @rdname cogroup
+#' @export
+setGeneric("cogroup",
+ function(..., numPartitions) {
+ standardGeneric("cogroup")
+ },
+ signature = "...")
+
+#' @rdname combineByKey
+#' @seealso groupByKey, reduceByKey
+#' @export
+setGeneric("combineByKey",
+ function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
+ standardGeneric("combineByKey")
+ })
+
+#' @rdname foldByKey
+#' @seealso aggregateByKey, combineByKey
+#' @export
+setGeneric("foldByKey",
+ function(x, zeroValue, func, numPartitions) {
+ standardGeneric("foldByKey")
+ })
+
+#' @rdname join-methods
+#' @export
+setGeneric("fullOuterJoin", function(x, y, numPartitions) { standardGeneric("fullOuterJoin") })
+
+#' @rdname groupByKey
+#' @seealso reduceByKey
+#' @export
+setGeneric("groupByKey", function(x, numPartitions) { standardGeneric("groupByKey") })
+
+#' @rdname join-methods
+#' @export
+setGeneric("join", function(x, y, ...) { standardGeneric("join") })
+
+#' @rdname join-methods
+#' @export
+setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") })
+
+#' @rdname partitionBy
+#' @export
+setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") })
+
+#' @rdname reduceByKey
+#' @seealso groupByKey
+#' @export
+setGeneric("reduceByKey", function(x, combineFunc, numPartitions) { standardGeneric("reduceByKey")})
+
+#' @rdname reduceByKeyLocally
+#' @seealso reduceByKey
+#' @export
+setGeneric("reduceByKeyLocally",
+ function(x, combineFunc) {
+ standardGeneric("reduceByKeyLocally")
+ })
+
+#' @rdname join-methods
+#' @export
+setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("rightOuterJoin") })
+
+#' @rdname sortByKey
+#' @export
+setGeneric("sortByKey", function(x, ascending = TRUE, numPartitions = 1L) {
+ standardGeneric("sortByKey")
+})
+
+
+################### Broadcast Variable Methods #################
+
+#' @rdname broadcast
+#' @export
+setGeneric("value", function(bcast) { standardGeneric("value") })
+
+
+
+#################### DataFrame Methods ########################
+
+#' @rdname schema
+#' @export
+setGeneric("columns", function(x) {standardGeneric("columns") })
+
+#' @rdname schema
+#' @export
+setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
+
+#' @rdname explain
+#' @export
+setGeneric("explain", function(x, ...) { standardGeneric("explain") })
+
+#' @rdname filter
+#' @export
+setGeneric("filter", function(x, condition) { standardGeneric("filter") })
+
+#' @rdname DataFrame
+#' @export
+setGeneric("groupBy", function(x, ...) { standardGeneric("groupBy") })
+
+#' @rdname insertInto
+#' @export
+setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertInto") })
+
+#' @rdname intersect
+#' @export
+setGeneric("intersect", function(x, y) { standardGeneric("intersect") })
+
+#' @rdname isLocal
+#' @export
+setGeneric("isLocal", function(x) { standardGeneric("isLocal") })
+
+#' @rdname limit
+#' @export
+setGeneric("limit", function(x, num) {standardGeneric("limit") })
+
+#' @rdname sortDF
+#' @export
+setGeneric("orderBy", function(x, col) { standardGeneric("orderBy") })
+
+#' @rdname schema
+#' @export
+setGeneric("printSchema", function(x) { standardGeneric("printSchema") })
+
+#' @rdname registerTempTable
+#' @export
+setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })
+
+#' @rdname sampleDF
+#' @export
+setGeneric("sampleDF",
+ function(x, withReplacement, fraction, seed) {
+ standardGeneric("sampleDF")
+ })
+
+#' @rdname saveAsParquetFile
+#' @export
+setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
+
+#' @rdname saveAsTable
+#' @export
+setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
+ standardGeneric("saveAsTable")
+})
+
+#' @rdname saveAsTable
+#' @export
+setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
+
+#' @rdname schema
+#' @export
+setGeneric("schema", function(x) { standardGeneric("schema") })
+
+#' @rdname select
+#' @export
+setGeneric("select", function(x, col, ...) { standardGeneric("select") } )
+
+#' @rdname select
+#' @export
+setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr") })
+
+#' @rdname showDF
+#' @export
+setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
+
+#' @rdname sortDF
+#' @export
+setGeneric("sortDF", function(x, col, ...) { standardGeneric("sortDF") })
+
+#' @rdname subtract
+#' @export
+setGeneric("subtract", function(x, y) { standardGeneric("subtract") })
+
+#' @rdname tojson
+#' @export
+setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
+
+#' @rdname DataFrame
+#' @export
+setGeneric("toRDD", function(x) { standardGeneric("toRDD") })
+
+#' @rdname unionAll
+#' @export
+setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
+
+#' @rdname filter
+#' @export
+setGeneric("where", function(x, condition) { standardGeneric("where") })
+
+#' @rdname withColumn
+#' @export
+setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn") })
+
+#' @rdname withColumnRenamed
+#' @export
+setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
+ standardGeneric("withColumnRenamed") })
+
+
+###################### Column Methods ##########################
+
+#' @rdname column
+#' @export
+setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCountDistinct") })
+
+#' @rdname column
+#' @export
+setGeneric("asc", function(x) { standardGeneric("asc") })
+
+#' @rdname column
+#' @export
+setGeneric("avg", function(x, ...) { standardGeneric("avg") })
+
+#' @rdname column
+#' @export
+setGeneric("cast", function(x, dataType) { standardGeneric("cast") })
+
+#' @rdname column
+#' @export
+setGeneric("contains", function(x, ...) { standardGeneric("contains") })
+#' @rdname column
+#' @export
+setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct") })
+
+#' @rdname column
+#' @export
+setGeneric("desc", function(x) { standardGeneric("desc") })
+
+#' @rdname column
+#' @export
+setGeneric("endsWith", function(x, ...) { standardGeneric("endsWith") })
+
+#' @rdname column
+#' @export
+setGeneric("getField", function(x, ...) { standardGeneric("getField") })
+
+#' @rdname column
+#' @export
+setGeneric("getItem", function(x, ...) { standardGeneric("getItem") })
+
+#' @rdname column
+#' @export
+setGeneric("isNull", function(x) { standardGeneric("isNull") })
+
+#' @rdname column
+#' @export
+setGeneric("isNotNull", function(x) { standardGeneric("isNotNull") })
+
+#' @rdname column
+#' @export
+setGeneric("last", function(x) { standardGeneric("last") })
+
+#' @rdname column
+#' @export
+setGeneric("like", function(x, ...) { standardGeneric("like") })
+
+#' @rdname column
+#' @export
+setGeneric("lower", function(x) { standardGeneric("lower") })
+
+#' @rdname column
+#' @export
+setGeneric("rlike", function(x, ...) { standardGeneric("rlike") })
+
+#' @rdname column
+#' @export
+setGeneric("startsWith", function(x, ...) { standardGeneric("startsWith") })
+
+#' @rdname column
+#' @export
+setGeneric("sumDistinct", function(x) { standardGeneric("sumDistinct") })
+
+#' @rdname column
+#' @export
+setGeneric("upper", function(x) { standardGeneric("upper") })
+
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/group.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R
new file mode 100644
index 0000000..09fc0a7
--- /dev/null
+++ b/R/pkg/R/group.R
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# group.R - GroupedData class and methods implemented in S4 OO classes
+
+setOldClass("jobj")
+
+#' @title S4 class that represents a GroupedData
+#' @description GroupedDatas can be created using groupBy() on a DataFrame
+#' @rdname GroupedData
+#' @seealso groupBy
+#'
+#' @param sgd A Java object reference to the backing Scala GroupedData
+#' @export
+setClass("GroupedData",
+ slots = list(sgd = "jobj"))
+
+setMethod("initialize", "GroupedData", function(.Object, sgd) {
+ .Object@sgd <- sgd
+ .Object
+})
+
+#' @rdname DataFrame
+groupedData <- function(sgd) {
+ new("GroupedData", sgd)
+}
+
+
+#' @rdname show
+setMethod("show", "GroupedData",
+ function(object) {
+ cat("GroupedData\n")
+ })
+
+#' Count
+#'
+#' Count the number of rows for each group.
+#' The resulting DataFrame will also contain the grouping columns.
+#'
+#' @param x a GroupedData
+#' @return a DataFrame
+#' @export
+#' @examples
+#' \dontrun{
+#' count(groupBy(df, "name"))
+#' }
+setMethod("count",
+ signature(x = "GroupedData"),
+ function(x) {
+ dataFrame(callJMethod(x@sgd, "count"))
+ })
+
+#' Agg
+#'
+#' Aggregates on the entire DataFrame without groups.
+#' The resulting DataFrame will also contain the grouping columns.
+#'
+#' df2 <- agg(df, <column> = <aggFunction>)
+#' df2 <- agg(df, newColName = aggFunction(column))
+#'
+#' @param x a GroupedData
+#' @return a DataFrame
+#' @rdname agg
+#' @examples
+#' \dontrun{
+#' df2 <- agg(df, age = "sum") # new column name will be created as 'SUM(age#0)'
+#' df2 <- agg(df, ageSum = sum(df$age)) # Creates a new column named ageSum
+#' }
+setGeneric("agg", function (x, ...) { standardGeneric("agg") })
+
+setMethod("agg",
+ signature(x = "GroupedData"),
+ function(x, ...) {
+ cols = list(...)
+ stopifnot(length(cols) > 0)
+ if (is.character(cols[[1]])) {
+ cols <- varargsToEnv(...)
+ sdf <- callJMethod(x@sgd, "agg", cols)
+ } else if (class(cols[[1]]) == "Column") {
+ ns <- names(cols)
+ if (!is.null(ns)) {
+ for (n in ns) {
+ if (n != "") {
+ cols[[n]] = alias(cols[[n]], n)
+ }
+ }
+ }
+ jcols <- lapply(cols, function(c) { c@jc })
+ # the GroupedData.agg(col, cols*) API does not contain grouping Column
+ sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "aggWithGrouping",
+ x@sgd, listToSeq(jcols))
+ } else {
+ stop("agg can only support Column or character")
+ }
+ dataFrame(sdf)
+ })
+
+
+# sum/mean/avg/min/max
+methods <- c("sum", "mean", "avg", "min", "max")
+
+createMethod <- function(name) {
+ setMethod(name,
+ signature(x = "GroupedData"),
+ function(x, ...) {
+ sdf <- callJMethod(x@sgd, name, toSeq(...))
+ dataFrame(sdf)
+ })
+}
+
+createMethods <- function() {
+ for (name in methods) {
+ createMethod(name)
+ }
+}
+
+createMethods()
+
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/jobj.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/jobj.R b/R/pkg/R/jobj.R
new file mode 100644
index 0000000..4180f14
--- /dev/null
+++ b/R/pkg/R/jobj.R
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# References to objects that exist on the JVM backend
+# are maintained using the jobj.
+
+# Maintain a reference count of Java object references
+# This allows us to GC the java object when it is safe
+.validJobjs <- new.env(parent = emptyenv())
+
+# List of object ids to be removed
+.toRemoveJobjs <- new.env(parent = emptyenv())
+
+# Check if jobj was created with the current SparkContext
+isValidJobj <- function(jobj) {
+ if (exists(".scStartTime", envir = .sparkREnv)) {
+ jobj$appId == get(".scStartTime", envir = .sparkREnv)
+ } else {
+ FALSE
+ }
+}
+
+getJobj <- function(objId) {
+ newObj <- jobj(objId)
+ if (exists(objId, .validJobjs)) {
+ .validJobjs[[objId]] <- .validJobjs[[objId]] + 1
+ } else {
+ .validJobjs[[objId]] <- 1
+ }
+ newObj
+}
+
+# Handler for a java object that exists on the backend.
+jobj <- function(objId) {
+ if (!is.character(objId)) {
+ stop("object id must be a character")
+ }
+ # NOTE: We need a new env for a jobj as we can only register
+ # finalizers for environments or external references pointers.
+ obj <- structure(new.env(parent = emptyenv()), class = "jobj")
+ obj$id <- objId
+ obj$appId <- get(".scStartTime", envir = .sparkREnv)
+
+ # Register a finalizer to remove the Java object when this reference
+ # is garbage collected in R
+ reg.finalizer(obj, cleanup.jobj)
+ obj
+}
+
+#' Print a JVM object reference.
+#'
+#' This function prints the type and id for an object stored
+#' in the SparkR JVM backend.
+#'
+#' @param x The JVM object reference
+#' @param ... further arguments passed to or from other methods
+print.jobj <- function(x, ...) {
+ cls <- callJMethod(x, "getClass")
+ name <- callJMethod(cls, "getName")
+ cat("Java ref type", name, "id", x$id, "\n", sep = " ")
+}
+
+cleanup.jobj <- function(jobj) {
+ if (isValidJobj(jobj)) {
+ objId <- jobj$id
+ # If we don't know anything about this jobj, ignore it
+ if (exists(objId, envir = .validJobjs)) {
+ .validJobjs[[objId]] <- .validJobjs[[objId]] - 1
+
+ if (.validJobjs[[objId]] == 0) {
+ rm(list = objId, envir = .validJobjs)
+ # NOTE: We cannot call removeJObject here as the finalizer may be run
+ # in the middle of another RPC. Thus we queue up this object Id to be removed
+ # and then run all the removeJObject when the next RPC is called.
+ .toRemoveJobjs[[objId]] <- 1
+ }
+ }
+ }
+}
+
+clearJobjs <- function() {
+ valid <- ls(.validJobjs)
+ rm(list = valid, envir = .validJobjs)
+
+ removeList <- ls(.toRemoveJobjs)
+ rm(list = removeList, envir = .toRemoveJobjs)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
new file mode 100644
index 0000000..c2396c3
--- /dev/null
+++ b/R/pkg/R/pairRDD.R
@@ -0,0 +1,789 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Operations supported on RDDs contains pairs (i.e key, value)
+
+############ Actions and Transformations ############
+
+#' Look up elements of a key in an RDD
+#'
+#' @description
+#' \code{lookup} returns a list of values in this RDD for key key.
+#'
+#' @param x The RDD to collect
+#' @param key The key to look up for
+#' @return a list of values in this RDD for key key
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(c(1, 1), c(2, 2), c(1, 3))
+#' rdd <- parallelize(sc, pairs)
+#' lookup(rdd, 1) # list(1, 3)
+#'}
+#' @rdname lookup
+#' @aliases lookup,RDD-method
+setMethod("lookup",
+ signature(x = "RDD", key = "ANY"),
+ function(x, key) {
+ partitionFunc <- function(part) {
+ filtered <- part[unlist(lapply(part, function(i) { identical(key, i[[1]]) }))]
+ lapply(filtered, function(i) { i[[2]] })
+ }
+ valsRDD <- lapplyPartition(x, partitionFunc)
+ collect(valsRDD)
+ })
+
+#' Count the number of elements for each key, and return the result to the
+#' master as lists of (key, count) pairs.
+#'
+#' Same as countByKey in Spark.
+#'
+#' @param x The RDD to count keys.
+#' @return list of (key, count) pairs, where count is number of each key in rdd.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
+#' countByKey(rdd) # ("a", 2L), ("b", 1L)
+#'}
+#' @rdname countByKey
+#' @aliases countByKey,RDD-method
+setMethod("countByKey",
+ signature(x = "RDD"),
+ function(x) {
+ keys <- lapply(x, function(item) { item[[1]] })
+ countByValue(keys)
+ })
+
+#' Return an RDD with the keys of each tuple.
+#'
+#' @param x The RDD from which the keys of each tuple is returned.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+#' collect(keys(rdd)) # list(1, 3)
+#'}
+#' @rdname keys
+#' @aliases keys,RDD
+setMethod("keys",
+ signature(x = "RDD"),
+ function(x) {
+ func <- function(k) {
+ k[[1]]
+ }
+ lapply(x, func)
+ })
+
+#' Return an RDD with the values of each tuple.
+#'
+#' @param x The RDD from which the values of each tuple is returned.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+#' collect(values(rdd)) # list(2, 4)
+#'}
+#' @rdname values
+#' @aliases values,RDD
+setMethod("values",
+ signature(x = "RDD"),
+ function(x) {
+ func <- function(v) {
+ v[[2]]
+ }
+ lapply(x, func)
+ })
+
+#' Applies a function to all values of the elements, without modifying the keys.
+#'
+#' The same as `mapValues()' in Spark.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on the value of each element.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' makePairs <- lapply(rdd, function(x) { list(x, x) })
+#' collect(mapValues(makePairs, function(x) { x * 2) })
+#' Output: list(list(1,2), list(2,4), list(3,6), ...)
+#'}
+#' @rdname mapValues
+#' @aliases mapValues,RDD,function-method
+setMethod("mapValues",
+ signature(X = "RDD", FUN = "function"),
+ function(X, FUN) {
+ func <- function(x) {
+ list(x[[1]], FUN(x[[2]]))
+ }
+ lapply(X, func)
+ })
+
+#' Pass each value in the key-value pair RDD through a flatMap function without
+#' changing the keys; this also retains the original RDD's partitioning.
+#'
+#' The same as 'flatMapValues()' in Spark.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on the value of each element.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
+#' collect(flatMapValues(rdd, function(x) { x }))
+#' Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
+#'}
+#' @rdname flatMapValues
+#' @aliases flatMapValues,RDD,function-method
+setMethod("flatMapValues",
+ signature(X = "RDD", FUN = "function"),
+ function(X, FUN) {
+ flatMapFunc <- function(x) {
+ lapply(FUN(x[[2]]), function(v) { list(x[[1]], v) })
+ }
+ flatMap(X, flatMapFunc)
+ })
+
+############ Shuffle Functions ############
+
+#' Partition an RDD by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' For each element of this RDD, the partitioner is used to compute a hash
+#' function and the RDD is partitioned using this hash value.
+#'
+#' @param x The RDD to partition. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @param ... Other optional arguments to partitionBy.
+#'
+#' @param partitionFunc The partition function to use. Uses a default hashCode
+#' function if not provided
+#' @return An RDD partitioned using the specified partitioner.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- partitionBy(rdd, 2L)
+#' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
+#'}
+#' @rdname partitionBy
+#' @aliases partitionBy,RDD,integer-method
+setMethod("partitionBy",
+ signature(x = "RDD", numPartitions = "integer"),
+ function(x, numPartitions, partitionFunc = hashCode) {
+
+ #if (missing(partitionFunc)) {
+ # partitionFunc <- hashCode
+ #}
+
+ partitionFunc <- cleanClosure(partitionFunc)
+ serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL)
+
+ packageNamesArr <- serialize(.sparkREnv$.packages,
+ connection = NULL)
+ broadcastArr <- lapply(ls(.broadcastNames), function(name) {
+ get(name, .broadcastNames) })
+ jrdd <- getJRDD(x)
+
+ # We create a PairwiseRRDD that extends RDD[(Array[Byte],
+ # Array[Byte])], where the key is the hashed split, the value is
+ # the content (key-val pairs).
+ pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
+ callJMethod(jrdd, "rdd"),
+ as.integer(numPartitions),
+ serializedHashFuncBytes,
+ getSerializedMode(x),
+ packageNamesArr,
+ as.character(.sparkREnv$libname),
+ broadcastArr,
+ callJMethod(jrdd, "classTag"))
+
+ # Create a corresponding partitioner.
+ rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
+ as.integer(numPartitions))
+
+ # Call partitionBy on the obtained PairwiseRDD.
+ javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
+ javaPairRDD <- callJMethod(javaPairRDD, "partitionBy", rPartitioner)
+
+ # Call .values() on the result to get back the final result, the
+ # shuffled acutal content key-val pairs.
+ r <- callJMethod(javaPairRDD, "values")
+
+ RDD(r, serializedMode = "byte")
+ })
+
+#' Group values by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and group values for each key in the RDD into a single sequence.
+#'
+#' @param x The RDD to group. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, list(V))
+#' @seealso reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- groupByKey(rdd, 2L)
+#' grouped <- collect(parts)
+#' grouped[[1]] # Should be a list(1, list(2, 4))
+#'}
+#' @rdname groupByKey
+#' @aliases groupByKey,RDD,integer-method
+setMethod("groupByKey",
+ signature(x = "RDD", numPartitions = "integer"),
+ function(x, numPartitions) {
+ shuffled <- partitionBy(x, numPartitions)
+ groupVals <- function(part) {
+ vals <- new.env()
+ keys <- new.env()
+ pred <- function(item) exists(item$hash, keys)
+ appendList <- function(acc, i) {
+ addItemToAccumulator(acc, i)
+ acc
+ }
+ makeList <- function(i) {
+ acc <- initAccumulator()
+ addItemToAccumulator(acc, i)
+ acc
+ }
+ # Each item in the partition is list of (K, V)
+ lapply(part,
+ function(item) {
+ item$hash <- as.character(hashCode(item[[1]]))
+ updateOrCreatePair(item, keys, vals, pred,
+ appendList, makeList)
+ })
+ # extract out data field
+ vals <- eapply(vals,
+ function(i) {
+ length(i$data) <- i$counter
+ i$data
+ })
+ # Every key in the environment contains a list
+ # Convert that to list(K, Seq[V])
+ convertEnvsToList(keys, vals)
+ }
+ lapplyPartition(shuffled, groupVals)
+ })
+
+#' Merge values by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and merges the values for each key using an associative reduce function.
+#'
+#' @param x The RDD to reduce by key. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param combineFunc The associative reduce function to use.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, V') where V' is the merged
+#' value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- reduceByKey(rdd, "+", 2L)
+#' reduced <- collect(parts)
+#' reduced[[1]] # Should be a list(1, 6)
+#'}
+#' @rdname reduceByKey
+#' @aliases reduceByKey,RDD,integer-method
+setMethod("reduceByKey",
+ signature(x = "RDD", combineFunc = "ANY", numPartitions = "integer"),
+ function(x, combineFunc, numPartitions) {
+ reduceVals <- function(part) {
+ vals <- new.env()
+ keys <- new.env()
+ pred <- function(item) exists(item$hash, keys)
+ lapply(part,
+ function(item) {
+ item$hash <- as.character(hashCode(item[[1]]))
+ updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
+ })
+ convertEnvsToList(keys, vals)
+ }
+ locallyReduced <- lapplyPartition(x, reduceVals)
+ shuffled <- partitionBy(locallyReduced, numPartitions)
+ lapplyPartition(shuffled, reduceVals)
+ })
+
+#' Merge values by key locally
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and merges the values for each key using an associative reduce function, but return the
+#' results immediately to the driver as an R list.
+#'
+#' @param x The RDD to reduce by key. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param combineFunc The associative reduce function to use.
+#' @return A list of elements of type list(K, V') where V' is the merged value for each key
+#' @seealso reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' reduced <- reduceByKeyLocally(rdd, "+")
+#' reduced # list(list(1, 6), list(1.1, 3))
+#'}
+#' @rdname reduceByKeyLocally
+#' @aliases reduceByKeyLocally,RDD,integer-method
+setMethod("reduceByKeyLocally",
+ signature(x = "RDD", combineFunc = "ANY"),
+ function(x, combineFunc) {
+ reducePart <- function(part) {
+ vals <- new.env()
+ keys <- new.env()
+ pred <- function(item) exists(item$hash, keys)
+ lapply(part,
+ function(item) {
+ item$hash <- as.character(hashCode(item[[1]]))
+ updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
+ })
+ list(list(keys, vals)) # return hash to avoid re-compute in merge
+ }
+ mergeParts <- function(accum, x) {
+ pred <- function(item) {
+ exists(item$hash, accum[[1]])
+ }
+ lapply(ls(x[[1]]),
+ function(name) {
+ item <- list(x[[1]][[name]], x[[2]][[name]])
+ item$hash <- name
+ updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, identity)
+ })
+ accum
+ }
+ reduced <- mapPartitions(x, reducePart)
+ merged <- reduce(reduced, mergeParts)
+ convertEnvsToList(merged[[1]], merged[[2]])
+ })
+
+#' Combine values by key
+#'
+#' Generic function to combine the elements for each key using a custom set of
+#' aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
+#' for a "combined type" C. Note that V and C can be different -- for example, one
+#' might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
+
+#' Users provide three functions:
+#' \itemize{
+#' \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
+#' \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
+#' \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
+#' two lists).
+#' }
+#'
+#' @param x The RDD to combine. Should be an RDD where each element is
+#' list(K, V) or c(K, V).
+#' @param createCombiner Create a combiner (C) given a value (V)
+#' @param mergeValue Merge the given value (V) with an existing combiner (C)
+#' @param mergeCombiners Merge two combiners and return a new combiner
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, C) where C is the combined type
+#'
+#' @seealso groupByKey, reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
+#' combined <- collect(parts)
+#' combined[[1]] # Should be a list(1, 6)
+#'}
+#' @rdname combineByKey
+#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
+setMethod("combineByKey",
+ signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
+ mergeCombiners = "ANY", numPartitions = "integer"),
+ function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
+ combineLocally <- function(part) {
+ combiners <- new.env()
+ keys <- new.env()
+ pred <- function(item) exists(item$hash, keys)
+ lapply(part,
+ function(item) {
+ item$hash <- as.character(item[[1]])
+ updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
+ })
+ convertEnvsToList(keys, combiners)
+ }
+ locallyCombined <- lapplyPartition(x, combineLocally)
+ shuffled <- partitionBy(locallyCombined, numPartitions)
+ mergeAfterShuffle <- function(part) {
+ combiners <- new.env()
+ keys <- new.env()
+ pred <- function(item) exists(item$hash, keys)
+ lapply(part,
+ function(item) {
+ item$hash <- as.character(item[[1]])
+ updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
+ })
+ convertEnvsToList(keys, combiners)
+ }
+ lapplyPartition(shuffled, mergeAfterShuffle)
+ })
+
+#' Aggregate a pair RDD by each key.
+#'
+#' Aggregate the values of each key in an RDD, using given combine functions
+#' and a neutral "zero value". This function can return a different result type,
+#' U, than the type of the values in this RDD, V. Thus, we need one operation
+#' for merging a V into a U and one operation for merging two U's, The former
+#' operation is used for merging values within a partition, and the latter is
+#' used for merging values between partitions. To avoid memory allocation, both
+#' of these functions are allowed to modify and return their first argument
+#' instead of creating a new U.
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param seqOp A function to aggregate the values of each key. It may return
+#' a different result type from the type of the values.
+#' @param combOp A function to aggregate results of seqOp.
+#' @return An RDD containing the aggregation result.
+#' @seealso foldByKey, combineByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+#' zeroValue <- list(0, 0)
+#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+#' # list(list(1, list(3, 2)), list(2, list(7, 2)))
+#'}
+#' @rdname aggregateByKey
+#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
+setMethod("aggregateByKey",
+ signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
+ combOp = "ANY", numPartitions = "integer"),
+ function(x, zeroValue, seqOp, combOp, numPartitions) {
+ createCombiner <- function(v) {
+ do.call(seqOp, list(zeroValue, v))
+ }
+
+ combineByKey(x, createCombiner, seqOp, combOp, numPartitions)
+ })
+
+#' Fold a pair RDD by each key.
+#'
+#' Aggregate the values of each key in an RDD, using an associative function "func"
+#' and a neutral "zero value" which may be added to the result an arbitrary
+#' number of times, and must not change the result (e.g., 0 for addition, or
+#' 1 for multiplication.).
+#'
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param func An associative function for folding values of each key.
+#' @return An RDD containing the aggregation result.
+#' @seealso aggregateByKey, combineByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
+#'}
+#' @rdname foldByKey
+#' @aliases foldByKey,RDD,ANY,ANY,integer-method
+setMethod("foldByKey",
+ signature(x = "RDD", zeroValue = "ANY",
+ func = "ANY", numPartitions = "integer"),
+ function(x, zeroValue, func, numPartitions) {
+ aggregateByKey(x, zeroValue, func, func, numPartitions)
+ })
+
+############ Binary Functions #############
+
+#' Join two RDDs
+#'
+#' @description
+#' \code{join} This function joins two RDDs where every element is of the form list(K, V).
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return a new RDD containing all pairs of elements with matching keys in
+#' two input RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
+#'}
+#' @rdname join-methods
+#' @aliases join,RDD,RDD-method
+setMethod("join",
+ signature(x = "RDD", y = "RDD"),
+ function(x, y, numPartitions) {
+ xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
+ yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
+
+ doJoin <- function(v) {
+ joinTaggedList(v, list(FALSE, FALSE))
+ }
+
+ joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
+ doJoin)
+ })
+
+#' Left outer join two RDDs
+#'
+#' @description
+#' \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V).
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, v) in x, the resulting RDD will either contain
+#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
+#' if no elements in rdd2 have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' leftOuterJoin(rdd1, rdd2, 2L)
+#' # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
+#'}
+#' @rdname join-methods
+#' @aliases leftOuterJoin,RDD,RDD-method
+setMethod("leftOuterJoin",
+ signature(x = "RDD", y = "RDD", numPartitions = "integer"),
+ function(x, y, numPartitions) {
+ xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
+ yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
+
+ doJoin <- function(v) {
+ joinTaggedList(v, list(FALSE, TRUE))
+ }
+
+ joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
+ })
+
+#' Right outer join two RDDs
+#'
+#' @description
+#' \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V).
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, w) in y, the resulting RDD will either contain
+#' all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
+#' if no elements in x have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rightOuterJoin(rdd1, rdd2, 2L)
+#' # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
+#'}
+#' @rdname join-methods
+#' @aliases rightOuterJoin,RDD,RDD-method
+setMethod("rightOuterJoin",
+ signature(x = "RDD", y = "RDD", numPartitions = "integer"),
+ function(x, y, numPartitions) {
+ xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
+ yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
+
+ doJoin <- function(v) {
+ joinTaggedList(v, list(TRUE, FALSE))
+ }
+
+ joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
+ })
+
+#' Full outer join two RDDs
+#'
+#' @description
+#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#' list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, v) in x and (k, w) in y, the resulting RDD
+#' will contain all pairs (k, (v, w)) for both (k, v) in x and
+#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
+#' in x/y have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
+#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
+#' # list(1, list(3, 1)),
+#' # list(2, list(NULL, 4)))
+#' # list(3, list(3, NULL)),
+#'}
+#' @rdname join-methods
+#' @aliases fullOuterJoin,RDD,RDD-method
+setMethod("fullOuterJoin",
+ signature(x = "RDD", y = "RDD", numPartitions = "integer"),
+ function(x, y, numPartitions) {
+ xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
+ yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
+
+ doJoin <- function(v) {
+ joinTaggedList(v, list(TRUE, TRUE))
+ }
+
+ joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
+ })
+
+#' For each key k in several RDDs, return a resulting RDD that
+#' whose values are a list of values for the key in all RDDs.
+#'
+#' @param ... Several RDDs.
+#' @param numPartitions Number of partitions to create.
+#' @return a new RDD containing all pairs of elements with values in a list
+#' in all RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' cogroup(rdd1, rdd2, numPartitions = 2L)
+#' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
+#'}
+#' @rdname cogroup
+#' @aliases cogroup,RDD-method
+setMethod("cogroup",
+ "RDD",
+ function(..., numPartitions) {
+ rdds <- list(...)
+ rddsLen <- length(rdds)
+ for (i in 1:rddsLen) {
+ rdds[[i]] <- lapply(rdds[[i]],
+ function(x) { list(x[[1]], list(i, x[[2]])) })
+ # TODO(hao): As issue [SparkR-142] mentions, the right value of i
+ # will not be captured into UDF if getJRDD is not invoked.
+ # It should be resolved together with that issue.
+ getJRDD(rdds[[i]]) # Capture the closure.
+ }
+ union.rdd <- Reduce(unionRDD, rdds)
+ group.func <- function(vlist) {
+ res <- list()
+ length(res) <- rddsLen
+ for (x in vlist) {
+ i <- x[[1]]
+ acc <- res[[i]]
+ # Create an accumulator.
+ if (is.null(acc)) {
+ acc <- initAccumulator()
+ }
+ addItemToAccumulator(acc, x[[2]])
+ res[[i]] <- acc
+ }
+ lapply(res, function(acc) {
+ if (is.null(acc)) {
+ list()
+ } else {
+ acc$data
+ }
+ })
+ }
+ cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions),
+ group.func)
+ })
+
+#' Sort a (k, v) pair RDD by k.
+#'
+#' @param x A (k, v) pair RDD to be sorted.
+#' @param ascending A flag to indicate whether the sorting is ascending or descending.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where all (k, v) pair elements are sorted.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
+#' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
+#'}
+#' @rdname sortByKey
+#' @aliases sortByKey,RDD,RDD-method
+setMethod("sortByKey",
+ signature(x = "RDD"),
+ function(x, ascending = TRUE, numPartitions = SparkR::numPartitions(x)) {
+ rangeBounds <- list()
+
+ if (numPartitions > 1) {
+ rddSize <- count(x)
+ # constant from Spark's RangePartitioner
+ maxSampleSize <- numPartitions * 20
+ fraction <- min(maxSampleSize / max(rddSize, 1), 1.0)
+
+ samples <- collect(keys(sampleRDD(x, FALSE, fraction, 1L)))
+
+ # Note: the built-in R sort() function only works on atomic vectors
+ samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending)
+
+ if (length(samples) > 0) {
+ rangeBounds <- lapply(seq_len(numPartitions - 1),
+ function(i) {
+ j <- ceiling(length(samples) * i / numPartitions)
+ samples[j]
+ })
+ }
+ }
+
+ rangePartitionFunc <- function(key) {
+ partition <- 0
+
+ # TODO: Use binary search instead of linear search, similar with Spark
+ while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) {
+ partition <- partition + 1
+ }
+
+ if (ascending) {
+ partition
+ } else {
+ numPartitions - partition - 1
+ }
+ }
+
+ partitionFunc <- function(part) {
+ sortKeyValueList(part, decreasing = !ascending)
+ }
+
+ newRDD <- partitionBy(x, numPartitions, rangePartitionFunc)
+ lapplyPartition(newRDD, partitionFunc)
+ })
+
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/serialize.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R
new file mode 100644
index 0000000..8a9c0c6
--- /dev/null
+++ b/R/pkg/R/serialize.R
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Utility functions to serialize R objects so they can be read in Java.
+
+# Type mapping from R to Java
+#
+# NULL -> Void
+# integer -> Int
+# character -> String
+# logical -> Boolean
+# double, numeric -> Double
+# raw -> Array[Byte]
+# Date -> Date
+# POSIXct,POSIXlt -> Time
+#
+# list[T] -> Array[T], where T is one of above mentioned types
+# environment -> Map[String, T], where T is a native type
+# jobj -> Object, where jobj is an object created in the backend
+
+writeObject <- function(con, object, writeType = TRUE) {
+ # NOTE: In R vectors have same type as objects. So we don't support
+ # passing in vectors as arrays and instead require arrays to be passed
+ # as lists.
+ type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
+ if (writeType) {
+ writeType(con, type)
+ }
+ switch(type,
+ NULL = writeVoid(con),
+ integer = writeInt(con, object),
+ character = writeString(con, object),
+ logical = writeBoolean(con, object),
+ double = writeDouble(con, object),
+ numeric = writeDouble(con, object),
+ raw = writeRaw(con, object),
+ list = writeList(con, object),
+ jobj = writeJobj(con, object),
+ environment = writeEnv(con, object),
+ Date = writeDate(con, object),
+ POSIXlt = writeTime(con, object),
+ POSIXct = writeTime(con, object),
+ stop(paste("Unsupported type for serialization", type)))
+}
+
+writeVoid <- function(con) {
+ # no value for NULL
+}
+
+writeJobj <- function(con, value) {
+ if (!isValidJobj(value)) {
+ stop("invalid jobj ", value$id)
+ }
+ writeString(con, value$id)
+}
+
+writeString <- function(con, value) {
+ writeInt(con, as.integer(nchar(value) + 1))
+ writeBin(value, con, endian = "big")
+}
+
+writeInt <- function(con, value) {
+ writeBin(as.integer(value), con, endian = "big")
+}
+
+writeDouble <- function(con, value) {
+ writeBin(value, con, endian = "big")
+}
+
+writeBoolean <- function(con, value) {
+ # TRUE becomes 1, FALSE becomes 0
+ writeInt(con, as.integer(value))
+}
+
+writeRawSerialize <- function(outputCon, batch) {
+ outputSer <- serialize(batch, ascii = FALSE, connection = NULL)
+ writeRaw(outputCon, outputSer)
+}
+
+writeRowSerialize <- function(outputCon, rows) {
+ invisible(lapply(rows, function(r) {
+ bytes <- serializeRow(r)
+ writeRaw(outputCon, bytes)
+ }))
+}
+
+serializeRow <- function(row) {
+ rawObj <- rawConnection(raw(0), "wb")
+ on.exit(close(rawObj))
+ writeRow(rawObj, row)
+ rawConnectionValue(rawObj)
+}
+
+writeRow <- function(con, row) {
+ numCols <- length(row)
+ writeInt(con, numCols)
+ for (i in 1:numCols) {
+ writeObject(con, row[[i]])
+ }
+}
+
+writeRaw <- function(con, batch) {
+ writeInt(con, length(batch))
+ writeBin(batch, con, endian = "big")
+}
+
+writeType <- function(con, class) {
+ type <- switch(class,
+ NULL = "n",
+ integer = "i",
+ character = "c",
+ logical = "b",
+ double = "d",
+ numeric = "d",
+ raw = "r",
+ list = "l",
+ jobj = "j",
+ environment = "e",
+ Date = "D",
+ POSIXlt = 't',
+ POSIXct = 't',
+ stop(paste("Unsupported type for serialization", class)))
+ writeBin(charToRaw(type), con)
+}
+
+# Used to pass arrays where all the elements are of the same type
+writeList <- function(con, arr) {
+ # All elements should be of same type
+ elemType <- unique(sapply(arr, function(elem) { class(elem) }))
+ stopifnot(length(elemType) <= 1)
+
+ # TODO: Empty lists are given type "character" right now.
+ # This may not work if the Java side expects array of any other type.
+ if (length(elemType) == 0) {
+ elemType <- class("somestring")
+ }
+
+ writeType(con, elemType)
+ writeInt(con, length(arr))
+
+ if (length(arr) > 0) {
+ for (a in arr) {
+ writeObject(con, a, FALSE)
+ }
+ }
+}
+
+# Used to pass in hash maps required on Java side.
+writeEnv <- function(con, env) {
+ len <- length(env)
+
+ writeInt(con, len)
+ if (len > 0) {
+ writeList(con, as.list(ls(env)))
+ vals <- lapply(ls(env), function(x) { env[[x]] })
+ writeList(con, as.list(vals))
+ }
+}
+
+writeDate <- function(con, date) {
+ writeString(con, as.character(date))
+}
+
+writeTime <- function(con, time) {
+ writeDouble(con, as.double(time))
+}
+
+# Used to serialize in a list of objects where each
+# object can be of a different type. Serialization format is
+# <object type> <object> for each object
+writeArgs <- function(con, args) {
+ if (length(args) > 0) {
+ for (a in args) {
+ writeObject(con, a)
+ }
+ }
+}
+
+writeStrings <- function(con, stringList) {
+ writeLines(unlist(stringList), con)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/sparkR.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
new file mode 100644
index 0000000..bc82df0
--- /dev/null
+++ b/R/pkg/R/sparkR.R
@@ -0,0 +1,266 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+.sparkREnv <- new.env()
+
+sparkR.onLoad <- function(libname, pkgname) {
+ .sparkREnv$libname <- libname
+}
+
+# Utility function that returns TRUE if we have an active connection to the
+# backend and FALSE otherwise
+connExists <- function(env) {
+ tryCatch({
+ exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
+ }, error = function(err) {
+ return(FALSE)
+ })
+}
+
+#' Stop the Spark context.
+#'
+#' Also terminates the backend this R session is connected to
+sparkR.stop <- function() {
+ env <- .sparkREnv
+ if (exists(".sparkRCon", envir = env)) {
+ # cat("Stopping SparkR\n")
+ if (exists(".sparkRjsc", envir = env)) {
+ sc <- get(".sparkRjsc", envir = env)
+ callJMethod(sc, "stop")
+ rm(".sparkRjsc", envir = env)
+ }
+
+ if (exists(".backendLaunched", envir = env)) {
+ callJStatic("SparkRHandler", "stopBackend")
+ }
+
+ # Also close the connection and remove it from our env
+ conn <- get(".sparkRCon", envir = env)
+ close(conn)
+
+ rm(".sparkRCon", envir = env)
+ rm(".scStartTime", envir = env)
+ }
+
+ if (exists(".monitorConn", envir = env)) {
+ conn <- get(".monitorConn", envir = env)
+ close(conn)
+ rm(".monitorConn", envir = env)
+ }
+
+ # Clear all broadcast variables we have
+ # as the jobj will not be valid if we restart the JVM
+ clearBroadcastVariables()
+
+ # Clear jobj maps
+ clearJobjs()
+}
+
+#' Initialize a new Spark Context.
+#'
+#' This function initializes a new SparkContext.
+#'
+#' @param master The Spark master URL.
+#' @param appName Application name to register with cluster manager
+#' @param sparkHome Spark Home directory
+#' @param sparkEnvir Named list of environment variables to set on worker nodes.
+#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
+#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
+#' @param sparkRLibDir The path where R is installed on the worker nodes.
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
+#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
+#' list(spark.executor.memory="1g"))
+#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
+#' list(spark.executor.memory="1g"),
+#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
+#' c("jarfile1.jar","jarfile2.jar"))
+#'}
+
+sparkR.init <- function(
+ master = "",
+ appName = "SparkR",
+ sparkHome = Sys.getenv("SPARK_HOME"),
+ sparkEnvir = list(),
+ sparkExecutorEnv = list(),
+ sparkJars = "",
+ sparkRLibDir = "") {
+
+ if (exists(".sparkRjsc", envir = .sparkREnv)) {
+ cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
+ return(get(".sparkRjsc", envir = .sparkREnv))
+ }
+
+ sparkMem <- Sys.getenv("SPARK_MEM", "512m")
+ jars <- suppressWarnings(normalizePath(as.character(sparkJars)))
+
+ # Classpath separator is ";" on Windows
+ # URI needs four /// as from http://stackoverflow.com/a/18522792
+ if (.Platform$OS.type == "unix") {
+ collapseChar <- ":"
+ uriSep <- "//"
+ } else {
+ collapseChar <- ";"
+ uriSep <- "////"
+ }
+
+ existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
+ if (existingPort != "") {
+ backendPort <- existingPort
+ } else {
+ path <- tempfile(pattern = "backend_port")
+ launchBackend(
+ args = path,
+ sparkHome = sparkHome,
+ jars = jars,
+ sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"))
+ # wait atmost 100 seconds for JVM to launch
+ wait <- 0.1
+ for (i in 1:25) {
+ Sys.sleep(wait)
+ if (file.exists(path)) {
+ break
+ }
+ wait <- wait * 1.25
+ }
+ if (!file.exists(path)) {
+ stop("JVM is not ready after 10 seconds")
+ }
+ f <- file(path, open='rb')
+ backendPort <- readInt(f)
+ monitorPort <- readInt(f)
+ close(f)
+ file.remove(path)
+ if (length(backendPort) == 0 || backendPort == 0 ||
+ length(monitorPort) == 0 || monitorPort == 0) {
+ stop("JVM failed to launch")
+ }
+ assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
+ assign(".backendLaunched", 1, envir = .sparkREnv)
+ }
+
+ .sparkREnv$backendPort <- backendPort
+ tryCatch({
+ connectBackend("localhost", backendPort)
+ }, error = function(err) {
+ stop("Failed to connect JVM\n")
+ })
+
+ if (nchar(sparkHome) != 0) {
+ sparkHome <- normalizePath(sparkHome)
+ }
+
+ if (nchar(sparkRLibDir) != 0) {
+ .sparkREnv$libname <- sparkRLibDir
+ }
+
+ sparkEnvirMap <- new.env()
+ for (varname in names(sparkEnvir)) {
+ sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]
+ }
+
+ sparkExecutorEnvMap <- new.env()
+ if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
+ sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
+ }
+ for (varname in names(sparkExecutorEnv)) {
+ sparkExecutorEnvMap[[varname]] <- sparkExecutorEnv[[varname]]
+ }
+
+ nonEmptyJars <- Filter(function(x) { x != "" }, jars)
+ localJarPaths <- sapply(nonEmptyJars, function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
+
+ # Set the start time to identify jobjs
+ # Seconds resolution is good enough for this purpose, so use ints
+ assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv)
+
+ assign(
+ ".sparkRjsc",
+ callJStatic(
+ "org.apache.spark.api.r.RRDD",
+ "createSparkContext",
+ master,
+ appName,
+ as.character(sparkHome),
+ as.list(localJarPaths),
+ sparkEnvirMap,
+ sparkExecutorEnvMap),
+ envir = .sparkREnv
+ )
+
+ sc <- get(".sparkRjsc", envir = .sparkREnv)
+
+ # Register a finalizer to sleep 1 seconds on R exit to make RStudio happy
+ reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE)
+
+ sc
+}
+
+#' Initialize a new SQLContext.
+#'
+#' This function creates a SparkContext from an existing JavaSparkContext and
+#' then uses it to initialize a new SQLContext
+#'
+#' @param jsc The existing JavaSparkContext created with SparkR.init()
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#'}
+
+sparkRSQL.init <- function(jsc) {
+ if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
+ return(get(".sparkRSQLsc", envir = .sparkREnv))
+ }
+
+ sqlCtx <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+ "createSQLContext",
+ jsc)
+ assign(".sparkRSQLsc", sqlCtx, envir = .sparkREnv)
+ sqlCtx
+}
+
+#' Initialize a new HiveContext.
+#'
+#' This function creates a HiveContext from an existing JavaSparkContext
+#'
+#' @param jsc The existing JavaSparkContext created with SparkR.init()
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRHive.init(sc)
+#'}
+
+sparkRHive.init <- function(jsc) {
+ if (exists(".sparkRHivesc", envir = .sparkREnv)) {
+ return(get(".sparkRHivesc", envir = .sparkREnv))
+ }
+
+ ssc <- callJMethod(jsc, "sc")
+ hiveCtx <- tryCatch({
+ newJObject("org.apache.spark.sql.hive.HiveContext", ssc)
+ }, error = function(err) {
+ stop("Spark SQL is not built with Hive support")
+ })
+
+ assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
+ hiveCtx
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org